From 6bef9c8489028b9941b8c48dddaec2f76d5b0f53 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 17 Feb 2014 15:19:01 +0100 Subject: [PATCH] Use optimized write and read calls if memoryAddress is present. Part of [#2239] --- .../main/c/io_netty_channel_epoll_Native.c | 50 +++++++++++-------- .../main/c/io_netty_channel_epoll_Native.h | 2 + .../channel/epoll/EpollSocketChannel.java | 17 +++++-- .../java/io/netty/channel/epoll/Native.java | 5 ++ 4 files changed, 49 insertions(+), 25 deletions(-) diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index 05b5edb5f4..e2b504f5cb 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -461,14 +461,8 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * e } } -JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) { - // TODO: We could also maybe pass the address in directly and so eliminate this call - // not sure if this would buy us much. So some testing is needed. - void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer); - if (buffer == NULL) { - throwRuntimeException(env, "Unable to access address of buffer"); - return -1; - } + +jint write0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) { ssize_t res; int err; do { @@ -488,14 +482,22 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jc throwIOException(env, exceptionMessage("Error while write(...): ", err)); return -1; } - if (posFieldId == NULL) { - (*env)->CallObjectMethod(env, jbuffer, updatePosId, pos + res); - } else { - (*env)->SetIntField(env, jbuffer, posFieldId, pos + res); - } return (jint) res; } +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) { + void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; + } + return write0(env, clazz, fd, buffer, pos, limit); +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit) { + return write0(env, clazz, fd, (void *) address, pos, limit); +} + void incrementPosition(JNIEnv * env, jobject bufObj, int written) { // Get the current position using the (*env)->GetIntField if possible and fallback // to slower (*env)->CallIntMethod(...) if needed @@ -578,14 +580,7 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, return res; } -JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) { - // TODO: We could also maybe pass the address in directly and so eliminate this call - // not sure if this would buy us much. So some testing is needed. - void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer); - if (buffer == NULL) { - throwRuntimeException(env, "Unable to access address of buffer"); - return -1; - } +jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) { ssize_t res; int err; do { @@ -613,6 +608,19 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jcl return (jint) res; } +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) { + void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; + } + return read0(env, clazz, fd, buffer, pos, limit); +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit) { + return read0(env, clazz, fd, (void*) address, pos, limit); +} + JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd) { if (close(fd) < 0) { throwIOException(env, "Error closing file descriptor"); diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index 3b93bb0c42..2d76071760 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -30,8 +30,10 @@ void Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv * env, jclass clazz, void Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id); void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint efd, jint fd); jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); +jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length); jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); +jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd); void Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write); jint Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index ebba1bb381..cb00276f6e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -115,8 +115,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So int readerIndex = buf.readerIndex(); int localFlushedAmount; if (buf.nioBufferCount() == 1) { - ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, readable); - localFlushedAmount = Native.write(fd, nioBuf, nioBuf.position(), nioBuf.limit()); + if (buf.hasMemoryAddress()) { + localFlushedAmount = Native.writeAddress(fd, buf.memoryAddress(), readerIndex, buf.writerIndex()); + } else { + ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, readable); + localFlushedAmount = Native.write(fd, nioBuf, nioBuf.position(), nioBuf.limit()); + } } else { // backed by more then one buffer, do a gathering write... ByteBuffer[] nioBufs = buf.nioBuffers(); @@ -509,8 +513,13 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So */ private int doReadBytes(ByteBuf byteBuf) throws Exception { int writerIndex = byteBuf.writerIndex(); - ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes()); - int localReadAmount = Native.read(fd, buf, buf.position(), buf.limit()); + int localReadAmount; + if (byteBuf.hasMemoryAddress()) { + localReadAmount = Native.readAddress(fd, byteBuf.memoryAddress(), writerIndex, byteBuf.capacity()); + } else { + ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes()); + localReadAmount = Native.read(fd, buf, buf.position(), buf.limit()); + } if (localReadAmount > 0) { byteBuf.writerIndex(writerIndex + localReadAmount); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index a45ea67b3a..e255149d75 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -60,9 +60,14 @@ final class Native { // File-descriptor operations public static native void close(int fd) throws IOException; + public static native int write(int fd, ByteBuffer buf, int pos, int limit) throws IOException; + public static native int writeAddress(int fd, long address, int pos, int limit) throws IOException; + public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException; public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException; + public static native int readAddress(int fd, long address, int pos, int limit) throws IOException; + public static native long sendfile(int dest, DefaultFileRegion src, long offset, long length) throws IOException; // socket operations