[#2647] Handle IOV_MAX in java code

Motivation:

The handling of IOV_MAX was done in JNI code base which makes stuff really complicated to maintain etc.

Modifications:

Move handling of IOV_MAX to java code to simplify stuff

Result:

Cleaner code.
This commit is contained in:
Norman Maurer 2014-07-17 16:00:53 +02:00
parent e258b48f8d
commit dfa00508d9
4 changed files with 134 additions and 160 deletions

View File

@ -711,141 +711,69 @@ jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint leng
}
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) {
// Calculate maximal size of iov
//
// See https://github.com/netty/netty/issues/2647
int iovLen = IOV_MAX < length ? IOV_MAX : length;
struct iovec iov[iovLen];
jlong w = 0;
while (length > 0) {
int i;
int iovidx = 0;
int loop = IOV_MAX < length ? IOV_MAX : length;
int num = offset + loop;
for (i = offset; i < num; i++) {
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
jint pos;
// Get the current position using the (*env)->GetIntField if possible and fallback
// to slower (*env)->CallIntMethod(...) if needed
if (posFieldId == NULL) {
pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
} else {
pos = (*env)->GetIntField(env, bufObj, posFieldId);
}
jint limit;
// Get the current limit using the (*env)->GetIntField if possible and fallback
// to slower (*env)->CallIntMethod(...) if needed
if (limitFieldId == NULL) {
limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
} else {
limit = (*env)->GetIntField(env, bufObj, limitFieldId);
}
void *buffer = (*env)->GetDirectBufferAddress(env, bufObj);
if (buffer == NULL) {
throwRuntimeException(env, "Unable to access address of buffer");
return -1;
}
iov[iovidx].iov_base = buffer + pos;
iov[iovidx].iov_len = (size_t) (limit - pos);
iovidx++;
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
struct iovec iov[length];
int iovidx = 0;
int i;
int num = offset + length;
for (i = offset; i < num; i++) {
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
jint pos;
// Get the current position using the (*env)->GetIntField if possible and fallback
// to slower (*env)->CallIntMethod(...) if needed
if (posFieldId == NULL) {
pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
} else {
pos = (*env)->GetIntField(env, bufObj, posFieldId);
}
jlong res = writev0(env, clazz, fd, iov, loop);
if (res <= 0) {
return res < 0 ? res : w;
jint limit;
// Get the current limit using the (*env)->GetIntField if possible and fallback
// to slower (*env)->CallIntMethod(...) if needed
if (limitFieldId == NULL) {
limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
} else {
limit = (*env)->GetIntField(env, bufObj, limitFieldId);
}
w += res;
// update the position of the written buffers
int written = res;
int a;
for (a = 0; a < loop; a++) {
int len = iov[a].iov_len;
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset);
if (len > written) {
incrementPosition(env, bufObj, written);
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
// incomplete write which means the channel is not writable anymore. Return now!
return w;
} else {
incrementPosition(env, bufObj, len);
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
written -= len;
}
void *buffer = (*env)->GetDirectBufferAddress(env, bufObj);
if (buffer == NULL) {
throwRuntimeException(env, "Unable to access address of buffer");
return -1;
}
offset += loop;
length -= loop;
iov[iovidx].iov_base = buffer + pos;
iov[iovidx].iov_len = (size_t) (limit - pos);
iovidx++;
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, bufObj);
}
return w;
return writev0(env, clazz, fd, iov, length);
}
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length) {
// Calculate maximal size of iov
//
// See https://github.com/netty/netty/issues/2647
int iovLen = IOV_MAX < length ? IOV_MAX : length;
struct iovec iov[iovLen];
jlong w = 0;
while (length > 0) {
int i;
int iovidx = 0;
int loop = IOV_MAX < length ? IOV_MAX : length;
int num = offset + loop;
for (i = offset; i < num; i++) {
jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i);
jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId);
jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId);
void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId);
struct iovec iov[length];
int iovidx = 0;
int i;
int num = offset + length;
for (i = offset; i < num; i++) {
jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i);
jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId);
jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId);
void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId);
iov[iovidx].iov_base = memoryAddress + readerIndex;
iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex);
iovidx++;
iov[iovidx].iov_base = memoryAddress + readerIndex;
iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex);
iovidx++;
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, addressEntry);
}
jlong res = writev0(env, clazz, fd, iov, loop);
if (res <= 0) {
return res < 0 ? res : w;
}
w += res;
offset += loop;
length -= loop;
// update the position of the written buffers
int written = res;
int a;
for (a = 0; a < loop; a++) {
int len = iov[a].iov_len;
if (len > written) {
// incomplete write which means the channel is not writable anymore. Return now!
return w;
}
written -= len;
}
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
//
// See https://github.com/netty/netty/issues/2623
(*env)->DeleteLocalRef(env, addressEntry);
}
return w;
return writev0(env, clazz, fd, iov, length);
}
jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) {
@ -1266,3 +1194,7 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEn
throwRuntimeException(env, exceptionMessage("Error during uname(...): ", err));
return NULL;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz) {
return IOV_MAX;
}

View File

@ -93,3 +93,4 @@ jint Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv *env, jclass claz
jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv *env, jclass clazz, jint fd);
jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz);

View File

@ -129,41 +129,51 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
private void writeBytesMultiple(
EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] nioBuffers) throws IOException {
EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] addresses) throws IOException {
int nioBufferCnt = in.addressCount();
int addressCnt = in.addressCount();
long expectedWrittenBytes = in.addressSize();
long localWrittenBytes = Native.writevAddresses(fd, nioBuffers, 0, nioBufferCnt);
boolean done = false;
boolean setEpollOut = false;
long writtenBytes = 0;
int offset = 0;
int end = offset + addressCnt;
int spinCount = config.getWriteSpinCount();
loop: while (addressCnt > 0) {
for (int i = spinCount - 1; i >= 0; i --) {
int cnt = addressCnt > Native.IOV_MAX? Native.IOV_MAX : addressCnt;
if (localWrittenBytes < expectedWrittenBytes) {
setEpollOut();
long localWrittenBytes = Native.writevAddresses(fd, addresses, offset, cnt);
if (localWrittenBytes == 0) {
setEpollOut = true;
break loop;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < localWrittenBytes) {
in.remove();
localWrittenBytes -= readableBytes;
} else if (readableBytes > localWrittenBytes) {
buf.readerIndex(readerIndex + (int) localWrittenBytes);
in.progress(localWrittenBytes);
break;
} else { // readable == writtenBytes
in.remove();
break;
while (offset < end && localWrittenBytes > 0) {
AddressEntry address = addresses[offset];
int readerIndex = address.readerIndex;
int bytes = address.writerIndex - readerIndex;
if (bytes > localWrittenBytes) {
address.readerIndex += (int) localWrittenBytes;
// incomplete write
break;
} else {
offset++;
addressCnt--;
localWrittenBytes -= bytes;
}
}
}
} else {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut);
}
private void writeBytesMultiple(
@ -174,20 +184,48 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
boolean done = false;
boolean setEpollOut = false;
long writtenBytes = 0;
int offset = 0;
int end = offset + nioBufferCnt;
int spinCount = config.getWriteSpinCount();
loop: while (nioBufferCnt > 0) {
for (int i = spinCount - 1; i >= 0; i --) {
int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setEpollOut = true;
break;
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt);
if (localWrittenBytes == 0) {
setEpollOut = true;
break loop;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
while (offset < end && localWrittenBytes > 0) {
ByteBuffer buffer = nioBuffers[offset];
int pos = buffer.position();
int bytes = buffer.limit() - pos;
if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes);
// incomplete write
break;
} else {
offset++;
nioBufferCnt--;
localWrittenBytes -= bytes;
}
}
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut);
}
private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount,
boolean done, boolean setEpollOut) {
if (done) {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
@ -224,7 +262,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
incompleteWrite(setEpollOut);
}
}
private void incompleteWrite(boolean setEpollOut) {
// Did not write completely.
if (setEpollOut) {

View File

@ -52,6 +52,7 @@ final class Native {
public static final int EPOLLOUT = 0x02;
public static final int EPOLLACCEPT = 0x04;
public static final int EPOLLRDHUP = 0x08;
public static int IOV_MAX = iovMax();
public static native int eventFd();
public static native void eventFdWrite(int fd, long value);
@ -222,6 +223,9 @@ final class Native {
}
public static native String kernelVersion();
private static native int iovMax();
private Native() {
// utility
}