[#2647] Respect IOV_MAX when call writev in native transport

Motivation:

epoll transport fails on gathering write of more then 1024 buffers. As linux supports max. 1024 iov entries when calling writev(...) the epoll transport throws an exception.

Thanks again to @blucas to provide me with a reproducer and so helped me to understand what the issue is.

Modifications:

Make sure we break down the writes if to many buffers are uses for gathering writes.

Result:

Gathering writes work with any number of buffers
This commit is contained in:
Norman Maurer 2014-07-09 12:18:48 +02:00
parent ac8ac59148
commit fb22d34925
4 changed files with 151 additions and 87 deletions

View File

@ -46,7 +46,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
} }
public void testGatheringWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testGatheringWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, false, true); testGatheringWrite0(sb, cb, data, false, true);
} }
@Test(timeout = 30000) @Test(timeout = 30000)
@ -55,7 +55,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
} }
public void testGatheringWriteNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testGatheringWriteNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, false, false); testGatheringWrite0(sb, cb, data, false, false);
} }
@Test(timeout = 30000) @Test(timeout = 30000)
@ -64,7 +64,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
} }
public void testGatheringWriteWithCompositeNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testGatheringWriteWithCompositeNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, true, false); testGatheringWrite0(sb, cb, data, true, false);
} }
@Test(timeout = 30000) @Test(timeout = 30000)
@ -73,11 +73,23 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
} }
public void testGatheringWriteWithComposite(ServerBootstrap sb, Bootstrap cb) throws Throwable { public void testGatheringWriteWithComposite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testGatheringWrite0(sb, cb, true, true); testGatheringWrite0(sb, cb, data, true, true);
}
// Test for https://github.com/netty/netty/issues/2647
@Test(timeout = 30000)
public void testGatheringWriteBig() throws Throwable {
run();
}
public void testGatheringWriteBig(ServerBootstrap sb, Bootstrap cb) throws Throwable {
byte[] bigData = new byte[1024 * 1024 * 50];
random.nextBytes(bigData);
testGatheringWrite0(sb, cb, bigData, false, true);
} }
private static void testGatheringWrite0( private static void testGatheringWrite0(
ServerBootstrap sb, Bootstrap cb, boolean composite, boolean autoRead) throws Throwable { ServerBootstrap sb, Bootstrap cb, byte[] data, boolean composite, boolean autoRead) throws Throwable {
final TestHandler sh = new TestHandler(autoRead); final TestHandler sh = new TestHandler(autoRead);
final TestHandler ch = new TestHandler(autoRead); final TestHandler ch = new TestHandler(autoRead);
@ -88,7 +100,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
Channel cc = cb.connect().sync().channel(); Channel cc = cb.connect().sync().channel();
for (int i = 0; i < data.length;) { for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i); int length = Math.min(random.nextInt(1024 * 8), data.length - i);
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length); ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
if (composite && i % 2 == 0) { if (composite && i % 2 == 0) {
int split = buf.readableBytes() / 2; int split = buf.readableBytes() / 2;

View File

@ -711,94 +711,141 @@ jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint leng
} }
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) { JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) {
struct iovec iov[length]; // Calculate maximal size of iov
int i; //
int iovidx = 0; // See https://github.com/netty/netty/issues/2647
for (i = offset; i < length; i++) { int iovLen = IOV_MAX < length ? IOV_MAX : length;
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i); struct iovec iov[iovLen];
jint pos; jlong w = 0;
// Get the current position using the (*env)->GetIntField if possible and fallback while (length > 0) {
// to slower (*env)->CallIntMethod(...) if needed int i;
if (posFieldId == NULL) { int iovidx = 0;
pos = (*env)->CallIntMethod(env, bufObj, posId, NULL); int loop = IOV_MAX < length ? IOV_MAX : length;
} else { int num = offset + loop;
pos = (*env)->GetIntField(env, bufObj, posFieldId); for (i = offset; i < num; i++) {
} jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
jint limit; jint pos;
// Get the current limit using the (*env)->GetIntField if possible and fallback // Get the current position using the (*env)->GetIntField if possible and fallback
// to slower (*env)->CallIntMethod(...) if needed // to slower (*env)->CallIntMethod(...) if needed
if (limitFieldId == NULL) { if (posFieldId == NULL) {
limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL); pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
} else { } else {
limit = (*env)->GetIntField(env, bufObj, limitFieldId); pos = (*env)->GetIntField(env, bufObj, posFieldId);
} }
void *buffer = (*env)->GetDirectBufferAddress(env, bufObj); jint limit;
if (buffer == NULL) { // Get the current limit using the (*env)->GetIntField if possible and fallback
throwRuntimeException(env, "Unable to access address of buffer"); // to slower (*env)->CallIntMethod(...) if needed
return -1; if (limitFieldId == NULL) {
} limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
iov[iovidx].iov_base = buffer + pos; } else {
iov[iovidx].iov_len = (size_t) (limit - pos); limit = (*env)->GetIntField(env, bufObj, limitFieldId);
iovidx++; }
void *buffer = (*env)->GetDirectBufferAddress(env, bufObj);
if (buffer == NULL) {
throwRuntimeException(env, "Unable to access address of buffer");
return -1;
}
iov[iovidx].iov_base = buffer + pos;
iov[iovidx].iov_len = (size_t) (limit - pos);
iovidx++;
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
}
jlong res = writev0(env, clazz, fd, iov, loop);
if (res <= 0) {
return res < 0 ? res : w;
}
w += res;
offset += loop;
length -= loop;
// update the position of the written buffers
int written = res;
int a;
for (a = 0; a < loop; a++) {
int len = iov[a].iov_len;
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset);
if (len > written) {
incrementPosition(env, bufObj, written);
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
// incomplete write which means the channel is not writable anymore. Return now!
return w;
} else {
incrementPosition(env, bufObj, len);
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
written -= len;
}
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
}
jlong res = writev0(env, clazz, fd, iov, length);
if (res <= 0) {
return res;
}
// update the position of the written buffers
int written = res;
int a;
for (a = 0; a < length; a++) {
int len = iov[a].iov_len;
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset);
if (len >= written) {
incrementPosition(env, bufObj, written);
break;
} else {
incrementPosition(env, bufObj, len);
written -= len;
} }
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
} }
return res; return w;
} }
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length) { JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length) {
struct iovec iov[length]; // Calculate maximal size of iov
int i; //
int iovidx = 0; // See https://github.com/netty/netty/issues/2647
for (i = offset; i < length; i++) { int iovLen = IOV_MAX < length ? IOV_MAX : length;
jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i); struct iovec iov[iovLen];
jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId); jlong w = 0;
jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId); while (length > 0) {
void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId); int i;
int iovidx = 0;
int loop = IOV_MAX < length ? IOV_MAX : length;
int num = offset + loop;
for (i = offset; i < num; i++) {
jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i);
jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId);
jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId);
void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId);
iov[iovidx].iov_base = memoryAddress + readerIndex; iov[iovidx].iov_base = memoryAddress + readerIndex;
iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex); iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex);
iovidx++; iovidx++;
// Explicit delete local reference as otherwise the local references will only be released once the native method returns. // Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created. // Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
// //
// See https://github.com/netty/netty/issues/2623 // See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, addressEntry); (*env)->DeleteLocalRef(env, addressEntry);
} }
jlong res = writev0(env, clazz, fd, iov, length); jlong res = writev0(env, clazz, fd, iov, loop);
if (res <= 0) { if (res <= 0) {
return res; return res < 0 ? res : w;
}
w += res;
offset += loop;
length -= loop;
// update the position of the written buffers
int written = res;
int a;
for (a = 0; a < loop; a++) {
int len = iov[a].iov_len;
if (len > written) {
// incomplete write which means the channel is not writable anymore. Return now!
return w;
}
written -= len;
}
} }
return w;
} }
jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) { jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) {

View File

@ -14,7 +14,7 @@
* under the License. * under the License.
*/ */
#include <jni.h> #include <jni.h>
#include <limits.h>
#define EPOLL_READ 0x01 #define EPOLL_READ 0x01
#define EPOLL_WRITE 0x02 #define EPOLL_WRITE 0x02
@ -27,6 +27,12 @@
#define SO_REUSEPORT 15 #define SO_REUSEPORT 15
#endif /* SO_REUSEPORT */ #endif /* SO_REUSEPORT */
// Define IOV_MAX if not found to limit the iov size on writev calls
// See https://github.com/netty/netty/issues/2647
#ifndef IOV_MAX
#define IOV_MAX 1024
#endif /* IOV_MAX */
jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz); jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz);
void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value); void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value);
void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd); void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd);

View File

@ -133,7 +133,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
int nioBufferCnt = in.addressCount(); int nioBufferCnt = in.addressCount();
long expectedWrittenBytes = in.addressSize(); long expectedWrittenBytes = in.addressSize();
long localWrittenBytes = Native.writevAddresses(fd, nioBuffers, 0, nioBufferCnt); long localWrittenBytes = Native.writevAddresses(fd, nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes < expectedWrittenBytes) { if (localWrittenBytes < expectedWrittenBytes) {