diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_eventarray.c b/transport-native-kqueue/src/main/c/netty_kqueue_eventarray.c index 0fb0c4cffd..dd38cf544c 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_eventarray.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_eventarray.c @@ -24,104 +24,26 @@ #include "netty_unix_jni.h" #include "netty_unix_util.h" -static jfieldID kqueueJniPtrFieldId = NULL; - -static void netty_kqueue_eventarray_evSet(JNIEnv* env, jclass clzz, jlong keventAddress, jobject channel, jint ident, jshort filter, jshort flags, jint fflags) { - // Create a global pointer, cast it as a long, and retain it in java to re-use and free later. - jlong jniSelfPtr = (*env)->GetLongField(env, channel, kqueueJniPtrFieldId); - if (jniSelfPtr == 0) { - jniSelfPtr = (jlong) (*env)->NewGlobalRef(env, channel); - (*env)->SetLongField(env, channel, kqueueJniPtrFieldId, jniSelfPtr); - } else if ((flags & EV_DELETE) != 0) { - // If the event is deleted, make sure it no longer has a reference to the jniSelfPtr because it shouldn't be used after this point. - jniSelfPtr = 0; - } - EV_SET((struct kevent*) keventAddress, ident, filter, flags, fflags, 0, (jobject) jniSelfPtr); -} - -static jobject netty_kqueue_eventarray_getChannel(JNIEnv* env, jclass clazz, jlong keventAddress) { - struct kevent* event = (struct kevent*) keventAddress; - return event->udata == NULL ? NULL : (jobject) event->udata; -} - -static void netty_kqueue_eventarray_deleteGlobalRefs(JNIEnv* env, jclass clazz, jlong channelAddressStart, jlong channelAddressEnd) { - // Iterate over an array of longs, which are really pointers to the jobject NewGlobalRef created above in evSet - // and delete each one. The field has already been set to 0 in java. - jlong* itr = (jlong*) channelAddressStart; - const jlong* end = (jlong*) channelAddressEnd; - for (; itr != end; ++itr) { - (*env)->DeleteGlobalRef(env, (jobject) *itr); - } +static void netty_kqueue_eventarray_evSet(JNIEnv* env, jclass clzz, jlong keventAddress, jint ident, jshort filter, jshort flags, jint fflags) { + EV_SET((struct kevent*) keventAddress, ident, filter, flags, fflags, 0, NULL); } // JNI Method Registration Table Begin static const JNINativeMethod fixed_method_table[] = { - { "deleteGlobalRefs", "(JJ)V", (void *) netty_kqueue_eventarray_deleteGlobalRefs } - // "evSet" has a dynamic signature - // "getChannel" has a dynamic signature + { "evSet", "(JISSI)V", (void *) netty_kqueue_eventarray_evSet } }; static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); -static jint dynamicMethodsTableSize() { - return fixed_method_table_size + 2; -} - -static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) { - JNINativeMethod* dynamicMethods = malloc(sizeof(JNINativeMethod) * dynamicMethodsTableSize()); - memcpy(dynamicMethods, fixed_method_table, sizeof(fixed_method_table)); - char* dynamicTypeName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/kqueue/AbstractKQueueChannel;ISSI)V"); - JNINativeMethod* dynamicMethod = &dynamicMethods[fixed_method_table_size]; - dynamicMethod->name = "evSet"; - dynamicMethod->signature = netty_unix_util_prepend("(JL", dynamicTypeName); - dynamicMethod->fnPtr = (void *) netty_kqueue_eventarray_evSet; - free(dynamicTypeName); - - ++dynamicMethod; - dynamicTypeName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/kqueue/AbstractKQueueChannel;"); - dynamicMethod->name = "getChannel"; - dynamicMethod->signature = netty_unix_util_prepend("(J)L", dynamicTypeName); - dynamicMethod->fnPtr = (void *) netty_kqueue_eventarray_getChannel; - free(dynamicTypeName); - return dynamicMethods; -} - -static void freeDynamicMethodsTable(JNINativeMethod* dynamicMethods) { - jint fullMethodTableSize = dynamicMethodsTableSize(); - jint i = fixed_method_table_size; - for (; i < fullMethodTableSize; ++i) { - free(dynamicMethods[i].signature); - } - free(dynamicMethods); -} // JNI Method Registration Table End jint netty_kqueue_eventarray_JNI_OnLoad(JNIEnv* env, const char* packagePrefix) { - JNINativeMethod* dynamicMethods = createDynamicMethodsTable(packagePrefix); if (netty_unix_util_register_natives(env, packagePrefix, "io/netty/channel/kqueue/KQueueEventArray", - dynamicMethods, - dynamicMethodsTableSize()) != 0) { - freeDynamicMethodsTable(dynamicMethods); + fixed_method_table, + fixed_method_table_size) != 0) { return JNI_ERR; } - freeDynamicMethodsTable(dynamicMethods); - dynamicMethods = NULL; - - char* nettyClassName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/kqueue/AbstractKQueueChannel"); - jclass kqueueChannelCls = (*env)->FindClass(env, nettyClassName); - free(nettyClassName); - nettyClassName = NULL; - if (kqueueChannelCls == NULL) { - return JNI_ERR; - } - - kqueueJniPtrFieldId = (*env)->GetFieldID(env, kqueueChannelCls, "jniSelfPtr", "J"); - if (kqueueJniPtrFieldId == NULL) { - netty_unix_errors_throwRuntimeException(env, "failed to get field ID: AbstractKQueueChannel.jniSelfPtr"); - return JNI_ERR; - } - return NETTY_JNI_VERSION; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index a98ee5777e..a32a0351a0 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -69,15 +69,6 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan private boolean writeFilterEnabled; boolean readReadyRunnablePending; boolean inputClosedSeenErrorOnRead; - /** - * This member variable means we don't have to have a map in {@link KQueueEventLoop} which associates the FDs - * from kqueue to instances of this class. This field will be initialized by JNI when modifying kqueue events. - * If there is no global reference when JNI gets a kqueue evSet call (aka this field is 0) then a global reference - * will be created and the address will be saved in this member variable. Then when we process a kevent in Java - * we can ask JNI to give us the {@link AbstractKQueueChannel} that corresponds to that event. - */ - long jniSelfPtr; - protected volatile boolean active; private volatile SocketAddress local; private volatile SocketAddress remote; @@ -213,6 +204,9 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan // make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the // new EventLoop. readReadyRunnablePending = false; + + ((KQueueEventLoop) eventLoop()).add(this); + // Add the write event first so we get notified of connection refused on the client side! if (writeFilterEnabled) { evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventArray.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventArray.java index 4a263280a6..43b5e62663 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventArray.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventArray.java @@ -79,7 +79,7 @@ final class KQueueEventArray { void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) { reallocIfNeeded(); - evSet(getKEventOffset(size++) + memoryAddress, ch, ch.socket.intValue(), filter, flags, fflags); + evSet(getKEventOffset(size++) + memoryAddress, ch.socket.intValue(), filter, flags, fflags); } private void reallocIfNeeded() { @@ -165,16 +165,9 @@ final class KQueueEventArray { return memory.getLong(getKEventOffset(index) + KQUEUE_DATA_OFFSET); } - AbstractKQueueChannel channel(int index) { - return getChannel(getKEventOffsetAddress(index)); - } - private static int calculateBufferCapacity(int capacity) { return capacity * KQUEUE_EVENT_SIZE; } - private static native void evSet(long keventAddress, AbstractKQueueChannel ch, - int ident, short filter, short flags, int fflags); - private static native AbstractKQueueChannel getChannel(long keventAddress); - static native void deleteGlobalRefs(long channelAddressStart, long channelAddressEnd); + private static native void evSet(long keventAddress, int ident, short filter, short flags, int fflags); } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index 8f2a4ca45d..25c50a4f05 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -23,6 +23,8 @@ import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.IovArray; import io.netty.util.IntSupplier; +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.collection.IntObjectMap; import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.PlatformDependent; @@ -31,11 +33,9 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.util.Queue; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import static io.netty.channel.kqueue.KQueueEventArray.deleteGlobalRefs; import static java.lang.Math.min; /** @@ -53,7 +53,6 @@ final class KQueueEventLoop extends SingleThreadEventLoop { KQueue.ensureAvailability(); } - private final NativeLongArray jniChannelPointers; private final boolean allowGrowing; private final FileDescriptor kqueueFd; private final KQueueEventArray changeList; @@ -66,6 +65,7 @@ final class KQueueEventLoop extends SingleThreadEventLoop { return kqueueWaitNow(); } }; + private final IntObjectMap channels = new IntObjectHashMap(4096); private volatile int wakenUp; private volatile int ioRatio = 50; @@ -83,7 +83,6 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } changeList = new KQueueEventArray(maxEvents); eventList = new KQueueEventArray(maxEvents); - jniChannelPointers = new NativeLongArray(4096); int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT); if (result < 0) { cleanup(); @@ -91,18 +90,18 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } } + void add(AbstractKQueueChannel ch) { + assert inEventLoop(); + channels.put(ch.fd().intValue(), ch); + } + void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) { changeList.evSet(ch, filter, flags, fflags); } - void remove(AbstractKQueueChannel ch) throws IOException { + void remove(AbstractKQueueChannel ch) { assert inEventLoop(); - if (ch.jniSelfPtr == 0) { - return; - } - - jniChannelPointers.add(ch.jniSelfPtr); - ch.jniSelfPtr = 0; + channels.remove(ch.fd().intValue()); } /** @@ -145,32 +144,25 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException { - deleteJniChannelPointers(); int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs); changeList.clear(); return numEvents; } - private void deleteJniChannelPointers() { - if (!jniChannelPointers.isEmpty()) { - deleteGlobalRefs(jniChannelPointers.memoryAddress(), jniChannelPointers.memoryAddressEnd()); - jniChannelPointers.clear(); - } - } - private void processReady(int ready) { for (int i = 0; i < ready; ++i) { final short filter = eventList.filter(i); final short flags = eventList.flags(i); + final int fd = eventList.fd(i); if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) { // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then // we later attempt to delete the filters from kqueue. assert filter != Native.EVFILT_USER || - (filter == Native.EVFILT_USER && eventList.fd(i) == KQUEUE_WAKE_UP_IDENT); + (filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT); continue; } - AbstractKQueueChannel channel = eventList.channel(i); + AbstractKQueueChannel channel = channels.get(fd); if (channel == null) { // This may happen if the channel has already been closed, and it will be removed from kqueue anyways. // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and @@ -327,12 +319,6 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } } finally { // Cleanup all native memory! - - // The JNI channel pointers should already be deleted because we should wait on kevent before this method, - // but lets just be sure we cleanup native memory. - deleteJniChannelPointers(); - jniChannelPointers.free(); - changeList.free(); eventList.free(); }