[#2647] Handle IOV_MAX in java code
Motivation: The handling of IOV_MAX was done in JNI code base which makes stuff really complicated to maintain etc. Modifications: Move handling of IOV_MAX to java code to simplify stuff Result: Cleaner code.
This commit is contained in:
parent
bc10131f71
commit
db790123fd
@ -663,104 +663,63 @@ void incrementPosition(JNIEnv * env, jobject bufObj, int written) {
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) {
|
||||
// Calculate maximal size of iov
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2647
|
||||
int iovLen = IOV_MAX < length ? IOV_MAX : length;
|
||||
struct iovec iov[iovLen];
|
||||
jlong w = 0;
|
||||
while (length > 0) {
|
||||
int i;
|
||||
int iovidx = 0;
|
||||
int loop = IOV_MAX < length ? IOV_MAX : length;
|
||||
int num = offset + loop;
|
||||
for (i = offset; i < num; i++) {
|
||||
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
|
||||
jint pos;
|
||||
// Get the current position using the (*env)->GetIntField if possible and fallback
|
||||
// to slower (*env)->CallIntMethod(...) if needed
|
||||
if (posFieldId == NULL) {
|
||||
pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
|
||||
} else {
|
||||
pos = (*env)->GetIntField(env, bufObj, posFieldId);
|
||||
}
|
||||
jint limit;
|
||||
// Get the current limit using the (*env)->GetIntField if possible and fallback
|
||||
// to slower (*env)->CallIntMethod(...) if needed
|
||||
if (limitFieldId == NULL) {
|
||||
limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
|
||||
} else {
|
||||
limit = (*env)->GetIntField(env, bufObj, limitFieldId);
|
||||
}
|
||||
void *buffer = (*env)->GetDirectBufferAddress(env, bufObj);
|
||||
if (buffer == NULL) {
|
||||
throwRuntimeException(env, "Unable to access address of buffer");
|
||||
return -1;
|
||||
}
|
||||
iov[iovidx].iov_base = buffer + pos;
|
||||
iov[iovidx].iov_len = (size_t) (limit - pos);
|
||||
iovidx++;
|
||||
|
||||
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
|
||||
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2623
|
||||
(*env)->DeleteLocalRef(env, bufObj);
|
||||
struct iovec iov[length];
|
||||
int iovidx = 0;
|
||||
int i;
|
||||
int num = offset + length;
|
||||
for (i = offset; i < num; i++) {
|
||||
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
|
||||
jint pos;
|
||||
// Get the current position using the (*env)->GetIntField if possible and fallback
|
||||
// to slower (*env)->CallIntMethod(...) if needed
|
||||
if (posFieldId == NULL) {
|
||||
pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
|
||||
} else {
|
||||
pos = (*env)->GetIntField(env, bufObj, posFieldId);
|
||||
}
|
||||
ssize_t res;
|
||||
int err;
|
||||
do {
|
||||
res = writev(fd, iov, loop);
|
||||
// keep on writing if it was interrupted
|
||||
} while(res == -1 && ((err = errno) == EINTR));
|
||||
|
||||
if (res < 0) {
|
||||
if (err == EAGAIN || err == EWOULDBLOCK) {
|
||||
// network stack is saturated we will try again later
|
||||
return w;
|
||||
}
|
||||
if (err == EBADF) {
|
||||
throwClosedChannelException(env);
|
||||
return -1;
|
||||
}
|
||||
throwIOException(env, exceptionMessage("Error while writev(...): ", err));
|
||||
jint limit;
|
||||
// Get the current limit using the (*env)->GetIntField if possible and fallback
|
||||
// to slower (*env)->CallIntMethod(...) if needed
|
||||
if (limitFieldId == NULL) {
|
||||
limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
|
||||
} else {
|
||||
limit = (*env)->GetIntField(env, bufObj, limitFieldId);
|
||||
}
|
||||
void *buffer = (*env)->GetDirectBufferAddress(env, bufObj);
|
||||
if (buffer == NULL) {
|
||||
throwRuntimeException(env, "Unable to access address of buffer");
|
||||
return -1;
|
||||
}
|
||||
iov[iovidx].iov_base = buffer + pos;
|
||||
iov[iovidx].iov_len = (size_t) (limit - pos);
|
||||
iovidx++;
|
||||
|
||||
w += res;
|
||||
|
||||
// update the position of the written buffers
|
||||
int written = res;
|
||||
int a;
|
||||
for (a = 0; a < loop; a++) {
|
||||
int len = iov[a].iov_len;
|
||||
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset);
|
||||
if (len > written) {
|
||||
incrementPosition(env, bufObj, written);
|
||||
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
|
||||
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2623
|
||||
(*env)->DeleteLocalRef(env, bufObj);
|
||||
|
||||
// incomplete write which means the channel is not writable anymore. Return now!
|
||||
return w;
|
||||
} else {
|
||||
incrementPosition(env, bufObj, len);
|
||||
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
|
||||
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2623
|
||||
(*env)->DeleteLocalRef(env, bufObj);
|
||||
written -= len;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
offset += loop;
|
||||
length -= loop;
|
||||
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
|
||||
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2623
|
||||
(*env)->DeleteLocalRef(env, bufObj);
|
||||
}
|
||||
return w;
|
||||
ssize_t res;
|
||||
int err;
|
||||
do {
|
||||
res = writev(fd, iov, length);
|
||||
// keep on writing if it was interrupted
|
||||
} while(res == -1 && ((err = errno) == EINTR));
|
||||
|
||||
if (res < 0) {
|
||||
if (err == EAGAIN || err == EWOULDBLOCK) {
|
||||
// network stack is saturated we will try again later
|
||||
return 0;
|
||||
}
|
||||
if (err == EBADF) {
|
||||
throwClosedChannelException(env);
|
||||
return -1;
|
||||
}
|
||||
throwIOException(env, exceptionMessage("Error while writev(...): ", err));
|
||||
return -1;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) {
|
||||
@ -1181,3 +1140,7 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEn
|
||||
throwRuntimeException(env, exceptionMessage("Error during uname(...): ", err));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz) {
|
||||
return IOV_MAX;
|
||||
}
|
@ -92,3 +92,4 @@ jint Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv *env, jclass claz
|
||||
jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv *env, jclass clazz, jint fd);
|
||||
|
||||
jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz);
|
||||
jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz);
|
||||
|
@ -128,20 +128,43 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
boolean done = false;
|
||||
boolean setEpollOut = false;
|
||||
long writtenBytes = 0;
|
||||
int offset = 0;
|
||||
int end = offset + nioBufferCnt;
|
||||
int spinCount = config.getWriteSpinCount();
|
||||
loop: while (nioBufferCnt > 0) {
|
||||
for (int i = spinCount - 1; i >= 0; i --) {
|
||||
int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt;
|
||||
|
||||
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
||||
long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
setEpollOut = true;
|
||||
break;
|
||||
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
setEpollOut = true;
|
||||
break loop;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
writtenBytes += localWrittenBytes;
|
||||
|
||||
while (offset < end && localWrittenBytes > 0) {
|
||||
ByteBuffer buffer = nioBuffers[offset];
|
||||
int pos = buffer.position();
|
||||
int bytes = buffer.limit() - pos;
|
||||
if (bytes > localWrittenBytes) {
|
||||
buffer.position(pos + (int) localWrittenBytes);
|
||||
// incomplete write
|
||||
break;
|
||||
} else {
|
||||
offset++;
|
||||
nioBufferCnt--;
|
||||
localWrittenBytes -= bytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
writtenBytes += localWrittenBytes;
|
||||
|
||||
if (expectedWrittenBytes == 0) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (done) {
|
||||
// Release all buffers
|
||||
for (int i = msgCount; i > 0; i --) {
|
||||
|
@ -51,6 +51,7 @@ final class Native {
|
||||
public static final int EPOLLOUT = 0x02;
|
||||
public static final int EPOLLACCEPT = 0x04;
|
||||
public static final int EPOLLRDHUP = 0x08;
|
||||
public static int IOV_MAX = iovMax();
|
||||
|
||||
public static native int eventFd();
|
||||
public static native void eventFdWrite(int fd, long value);
|
||||
@ -218,6 +219,9 @@ final class Native {
|
||||
}
|
||||
|
||||
public static native String kernelVersion();
|
||||
|
||||
private static native int iovMax();
|
||||
|
||||
private Native() {
|
||||
// utility
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user