[#2647] Respect IOV_MAX when call writev in native transport
Motivation: epoll transport fails on gathering write of more then 1024 buffers. As linux supports max. 1024 iov entries when calling writev(...) the epoll transport throws an exception. Thanks again to @blucas to provide me with a reproducer and so helped me to understand what the issue is. Modifications: Make sure we break down the writes if to many buffers are uses for gathering writes. Result: Gathering writes work with any number of buffers
This commit is contained in:
parent
5061be1b03
commit
b440fa840b
@ -46,7 +46,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
public void testGatheringWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testGatheringWrite0(sb, cb, false, true);
|
||||
testGatheringWrite0(sb, cb, data, false, true);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
@ -55,7 +55,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
public void testGatheringWriteNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testGatheringWrite0(sb, cb, false, false);
|
||||
testGatheringWrite0(sb, cb, data, false, false);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
@ -64,7 +64,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
public void testGatheringWriteWithCompositeNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testGatheringWrite0(sb, cb, true, false);
|
||||
testGatheringWrite0(sb, cb, data, true, false);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
@ -73,11 +73,23 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
public void testGatheringWriteWithComposite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testGatheringWrite0(sb, cb, true, true);
|
||||
testGatheringWrite0(sb, cb, data, true, true);
|
||||
}
|
||||
|
||||
// Test for https://github.com/netty/netty/issues/2647
|
||||
@Test(timeout = 30000)
|
||||
public void testGatheringWriteBig() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testGatheringWriteBig(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
byte[] bigData = new byte[1024 * 1024 * 50];
|
||||
random.nextBytes(bigData);
|
||||
testGatheringWrite0(sb, cb, bigData, false, true);
|
||||
}
|
||||
|
||||
private static void testGatheringWrite0(
|
||||
ServerBootstrap sb, Bootstrap cb, boolean composite, boolean autoRead) throws Throwable {
|
||||
ServerBootstrap sb, Bootstrap cb, byte[] data, boolean composite, boolean autoRead) throws Throwable {
|
||||
final TestHandler sh = new TestHandler(autoRead);
|
||||
final TestHandler ch = new TestHandler(autoRead);
|
||||
|
||||
@ -88,7 +100,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||
Channel cc = cb.connect().sync().channel();
|
||||
|
||||
for (int i = 0; i < data.length;) {
|
||||
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
|
||||
int length = Math.min(random.nextInt(1024 * 8), data.length - i);
|
||||
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
|
||||
if (composite && i % 2 == 0) {
|
||||
int split = buf.readableBytes() / 2;
|
||||
|
@ -663,85 +663,104 @@ void incrementPosition(JNIEnv * env, jobject bufObj, int written) {
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) {
|
||||
struct iovec iov[length];
|
||||
int i;
|
||||
int iovidx = 0;
|
||||
for (i = offset; i < length; i++) {
|
||||
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
|
||||
jint pos;
|
||||
// Get the current position using the (*env)->GetIntField if possible and fallback
|
||||
// to slower (*env)->CallIntMethod(...) if needed
|
||||
if (posFieldId == NULL) {
|
||||
pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
|
||||
} else {
|
||||
pos = (*env)->GetIntField(env, bufObj, posFieldId);
|
||||
// Calculate maximal size of iov
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2647
|
||||
int iovLen = IOV_MAX < length ? IOV_MAX : length;
|
||||
struct iovec iov[iovLen];
|
||||
jlong w = 0;
|
||||
while (length > 0) {
|
||||
int i;
|
||||
int iovidx = 0;
|
||||
int loop = IOV_MAX < length ? IOV_MAX : length;
|
||||
int num = offset + loop;
|
||||
for (i = offset; i < num; i++) {
|
||||
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
|
||||
jint pos;
|
||||
// Get the current position using the (*env)->GetIntField if possible and fallback
|
||||
// to slower (*env)->CallIntMethod(...) if needed
|
||||
if (posFieldId == NULL) {
|
||||
pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
|
||||
} else {
|
||||
pos = (*env)->GetIntField(env, bufObj, posFieldId);
|
||||
}
|
||||
jint limit;
|
||||
// Get the current limit using the (*env)->GetIntField if possible and fallback
|
||||
// to slower (*env)->CallIntMethod(...) if needed
|
||||
if (limitFieldId == NULL) {
|
||||
limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
|
||||
} else {
|
||||
limit = (*env)->GetIntField(env, bufObj, limitFieldId);
|
||||
}
|
||||
void *buffer = (*env)->GetDirectBufferAddress(env, bufObj);
|
||||
if (buffer == NULL) {
|
||||
throwRuntimeException(env, "Unable to access address of buffer");
|
||||
return -1;
|
||||
}
|
||||
iov[iovidx].iov_base = buffer + pos;
|
||||
iov[iovidx].iov_len = (size_t) (limit - pos);
|
||||
iovidx++;
|
||||
|
||||
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
|
||||
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2623
|
||||
(*env)->DeleteLocalRef(env, bufObj);
|
||||
}
|
||||
jint limit;
|
||||
// Get the current limit using the (*env)->GetIntField if possible and fallback
|
||||
// to slower (*env)->CallIntMethod(...) if needed
|
||||
if (limitFieldId == NULL) {
|
||||
limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
|
||||
} else {
|
||||
limit = (*env)->GetIntField(env, bufObj, limitFieldId);
|
||||
}
|
||||
void *buffer = (*env)->GetDirectBufferAddress(env, bufObj);
|
||||
if (buffer == NULL) {
|
||||
throwRuntimeException(env, "Unable to access address of buffer");
|
||||
ssize_t res;
|
||||
int err;
|
||||
do {
|
||||
res = writev(fd, iov, loop);
|
||||
// keep on writing if it was interrupted
|
||||
} while(res == -1 && ((err = errno) == EINTR));
|
||||
|
||||
if (res < 0) {
|
||||
if (err == EAGAIN || err == EWOULDBLOCK) {
|
||||
// network stack is saturated we will try again later
|
||||
return w;
|
||||
}
|
||||
if (err == EBADF) {
|
||||
throwClosedChannelException(env);
|
||||
return -1;
|
||||
}
|
||||
throwIOException(env, exceptionMessage("Error while writev(...): ", err));
|
||||
return -1;
|
||||
}
|
||||
iov[iovidx].iov_base = buffer + pos;
|
||||
iov[iovidx].iov_len = (size_t) (limit - pos);
|
||||
iovidx++;
|
||||
|
||||
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
|
||||
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2623
|
||||
(*env)->DeleteLocalRef(env, bufObj);
|
||||
}
|
||||
w += res;
|
||||
|
||||
ssize_t res;
|
||||
int err;
|
||||
do {
|
||||
res = writev(fd, iov, length);
|
||||
// keep on writing if it was interrupted
|
||||
} while(res == -1 && ((err = errno) == EINTR));
|
||||
// update the position of the written buffers
|
||||
int written = res;
|
||||
int a;
|
||||
for (a = 0; a < loop; a++) {
|
||||
int len = iov[a].iov_len;
|
||||
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset);
|
||||
if (len > written) {
|
||||
incrementPosition(env, bufObj, written);
|
||||
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
|
||||
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2623
|
||||
(*env)->DeleteLocalRef(env, bufObj);
|
||||
|
||||
if (res < 0) {
|
||||
if (err == EAGAIN || err == EWOULDBLOCK) {
|
||||
// network stack is saturated we will try again later
|
||||
return 0;
|
||||
}
|
||||
if (err == EBADF) {
|
||||
throwClosedChannelException(env);
|
||||
return -1;
|
||||
}
|
||||
throwIOException(env, exceptionMessage("Error while writev(...): ", err));
|
||||
return -1;
|
||||
}
|
||||
// incomplete write which means the channel is not writable anymore. Return now!
|
||||
return w;
|
||||
} else {
|
||||
incrementPosition(env, bufObj, len);
|
||||
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
|
||||
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2623
|
||||
(*env)->DeleteLocalRef(env, bufObj);
|
||||
written -= len;
|
||||
}
|
||||
|
||||
// update the position of the written buffers
|
||||
int written = res;
|
||||
int a;
|
||||
for (a = 0; a < length; a++) {
|
||||
int pos;
|
||||
int len = iov[a].iov_len;
|
||||
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset);
|
||||
if (len >= written) {
|
||||
incrementPosition(env, bufObj, written);
|
||||
break;
|
||||
} else {
|
||||
incrementPosition(env, bufObj, len);
|
||||
written -= len;
|
||||
}
|
||||
|
||||
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
|
||||
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2623
|
||||
(*env)->DeleteLocalRef(env, bufObj);
|
||||
offset += loop;
|
||||
length -= loop;
|
||||
}
|
||||
return res;
|
||||
return w;
|
||||
}
|
||||
|
||||
jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) {
|
||||
|
@ -14,7 +14,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
#include <jni.h>
|
||||
|
||||
#include <limits.h>
|
||||
|
||||
#define EPOLL_READ 0x01
|
||||
#define EPOLL_WRITE 0x02
|
||||
@ -27,6 +27,12 @@
|
||||
#define SO_REUSEPORT 15
|
||||
#endif /* SO_REUSEPORT */
|
||||
|
||||
// Define IOV_MAX if not found to limit the iov size on writev calls
|
||||
// See https://github.com/netty/netty/issues/2647
|
||||
#ifndef IOV_MAX
|
||||
#define IOV_MAX 1024
|
||||
#endif /* IOV_MAX */
|
||||
|
||||
jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz);
|
||||
void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value);
|
||||
void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd);
|
||||
|
@ -123,7 +123,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
|
||||
private void writeBytesMultiple(
|
||||
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException {
|
||||
|
||||
int nioBufferCnt = in.nioBufferCount();
|
||||
long expectedWrittenBytes = in.nioBufferSize();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user