Fix ClassCastException and native crash when using kqueue transport. (#8665)
Motivation: How we did the mapping from native code to AbstractKQueueChannel was not safe and could lead to heap corruption. This then sometimes produced ClassCastExceptions or could also lead to crashes. This happened sometimes when running the testsuite. Modifications: Use a Map for the mapping (just as we do in the native epoll transport). Result: No more heap corruption / crashes.
This commit is contained in:
parent
b12c3311ba
commit
5ecb34ee72
@ -24,104 +24,26 @@
|
|||||||
#include "netty_unix_jni.h"
|
#include "netty_unix_jni.h"
|
||||||
#include "netty_unix_util.h"
|
#include "netty_unix_util.h"
|
||||||
|
|
||||||
static jfieldID kqueueJniPtrFieldId = NULL;
|
static void netty_kqueue_eventarray_evSet(JNIEnv* env, jclass clzz, jlong keventAddress, jint ident, jshort filter, jshort flags, jint fflags) {
|
||||||
|
EV_SET((struct kevent*) keventAddress, ident, filter, flags, fflags, 0, NULL);
|
||||||
static void netty_kqueue_eventarray_evSet(JNIEnv* env, jclass clzz, jlong keventAddress, jobject channel, jint ident, jshort filter, jshort flags, jint fflags) {
|
|
||||||
// Create a global pointer, cast it as a long, and retain it in java to re-use and free later.
|
|
||||||
jlong jniSelfPtr = (*env)->GetLongField(env, channel, kqueueJniPtrFieldId);
|
|
||||||
if (jniSelfPtr == 0) {
|
|
||||||
jniSelfPtr = (jlong) (*env)->NewGlobalRef(env, channel);
|
|
||||||
(*env)->SetLongField(env, channel, kqueueJniPtrFieldId, jniSelfPtr);
|
|
||||||
} else if ((flags & EV_DELETE) != 0) {
|
|
||||||
// If the event is deleted, make sure it no longer has a reference to the jniSelfPtr because it shouldn't be used after this point.
|
|
||||||
jniSelfPtr = 0;
|
|
||||||
}
|
|
||||||
EV_SET((struct kevent*) keventAddress, ident, filter, flags, fflags, 0, (jobject) jniSelfPtr);
|
|
||||||
}
|
|
||||||
|
|
||||||
static jobject netty_kqueue_eventarray_getChannel(JNIEnv* env, jclass clazz, jlong keventAddress) {
|
|
||||||
struct kevent* event = (struct kevent*) keventAddress;
|
|
||||||
return event->udata == NULL ? NULL : (jobject) event->udata;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void netty_kqueue_eventarray_deleteGlobalRefs(JNIEnv* env, jclass clazz, jlong channelAddressStart, jlong channelAddressEnd) {
|
|
||||||
// Iterate over an array of longs, which are really pointers to the jobject NewGlobalRef created above in evSet
|
|
||||||
// and delete each one. The field has already been set to 0 in java.
|
|
||||||
jlong* itr = (jlong*) channelAddressStart;
|
|
||||||
const jlong* end = (jlong*) channelAddressEnd;
|
|
||||||
for (; itr != end; ++itr) {
|
|
||||||
(*env)->DeleteGlobalRef(env, (jobject) *itr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// JNI Method Registration Table Begin
|
// JNI Method Registration Table Begin
|
||||||
static const JNINativeMethod fixed_method_table[] = {
|
static const JNINativeMethod fixed_method_table[] = {
|
||||||
{ "deleteGlobalRefs", "(JJ)V", (void *) netty_kqueue_eventarray_deleteGlobalRefs }
|
{ "evSet", "(JISSI)V", (void *) netty_kqueue_eventarray_evSet }
|
||||||
// "evSet" has a dynamic signature
|
|
||||||
// "getChannel" has a dynamic signature
|
|
||||||
};
|
};
|
||||||
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);
|
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);
|
||||||
|
|
||||||
static jint dynamicMethodsTableSize() {
|
|
||||||
return fixed_method_table_size + 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
|
|
||||||
JNINativeMethod* dynamicMethods = malloc(sizeof(JNINativeMethod) * dynamicMethodsTableSize());
|
|
||||||
memcpy(dynamicMethods, fixed_method_table, sizeof(fixed_method_table));
|
|
||||||
char* dynamicTypeName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/kqueue/AbstractKQueueChannel;ISSI)V");
|
|
||||||
JNINativeMethod* dynamicMethod = &dynamicMethods[fixed_method_table_size];
|
|
||||||
dynamicMethod->name = "evSet";
|
|
||||||
dynamicMethod->signature = netty_unix_util_prepend("(JL", dynamicTypeName);
|
|
||||||
dynamicMethod->fnPtr = (void *) netty_kqueue_eventarray_evSet;
|
|
||||||
free(dynamicTypeName);
|
|
||||||
|
|
||||||
++dynamicMethod;
|
|
||||||
dynamicTypeName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/kqueue/AbstractKQueueChannel;");
|
|
||||||
dynamicMethod->name = "getChannel";
|
|
||||||
dynamicMethod->signature = netty_unix_util_prepend("(J)L", dynamicTypeName);
|
|
||||||
dynamicMethod->fnPtr = (void *) netty_kqueue_eventarray_getChannel;
|
|
||||||
free(dynamicTypeName);
|
|
||||||
return dynamicMethods;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void freeDynamicMethodsTable(JNINativeMethod* dynamicMethods) {
|
|
||||||
jint fullMethodTableSize = dynamicMethodsTableSize();
|
|
||||||
jint i = fixed_method_table_size;
|
|
||||||
for (; i < fullMethodTableSize; ++i) {
|
|
||||||
free(dynamicMethods[i].signature);
|
|
||||||
}
|
|
||||||
free(dynamicMethods);
|
|
||||||
}
|
|
||||||
// JNI Method Registration Table End
|
// JNI Method Registration Table End
|
||||||
|
|
||||||
jint netty_kqueue_eventarray_JNI_OnLoad(JNIEnv* env, const char* packagePrefix) {
|
jint netty_kqueue_eventarray_JNI_OnLoad(JNIEnv* env, const char* packagePrefix) {
|
||||||
JNINativeMethod* dynamicMethods = createDynamicMethodsTable(packagePrefix);
|
|
||||||
if (netty_unix_util_register_natives(env,
|
if (netty_unix_util_register_natives(env,
|
||||||
packagePrefix,
|
packagePrefix,
|
||||||
"io/netty/channel/kqueue/KQueueEventArray",
|
"io/netty/channel/kqueue/KQueueEventArray",
|
||||||
dynamicMethods,
|
fixed_method_table,
|
||||||
dynamicMethodsTableSize()) != 0) {
|
fixed_method_table_size) != 0) {
|
||||||
freeDynamicMethodsTable(dynamicMethods);
|
|
||||||
return JNI_ERR;
|
return JNI_ERR;
|
||||||
}
|
}
|
||||||
freeDynamicMethodsTable(dynamicMethods);
|
|
||||||
dynamicMethods = NULL;
|
|
||||||
|
|
||||||
char* nettyClassName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/kqueue/AbstractKQueueChannel");
|
|
||||||
jclass kqueueChannelCls = (*env)->FindClass(env, nettyClassName);
|
|
||||||
free(nettyClassName);
|
|
||||||
nettyClassName = NULL;
|
|
||||||
if (kqueueChannelCls == NULL) {
|
|
||||||
return JNI_ERR;
|
|
||||||
}
|
|
||||||
|
|
||||||
kqueueJniPtrFieldId = (*env)->GetFieldID(env, kqueueChannelCls, "jniSelfPtr", "J");
|
|
||||||
if (kqueueJniPtrFieldId == NULL) {
|
|
||||||
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: AbstractKQueueChannel.jniSelfPtr");
|
|
||||||
return JNI_ERR;
|
|
||||||
}
|
|
||||||
|
|
||||||
return NETTY_JNI_VERSION;
|
return NETTY_JNI_VERSION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,15 +69,6 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
|
|||||||
private boolean writeFilterEnabled;
|
private boolean writeFilterEnabled;
|
||||||
boolean readReadyRunnablePending;
|
boolean readReadyRunnablePending;
|
||||||
boolean inputClosedSeenErrorOnRead;
|
boolean inputClosedSeenErrorOnRead;
|
||||||
/**
|
|
||||||
* This member variable means we don't have to have a map in {@link KQueueEventLoop} which associates the FDs
|
|
||||||
* from kqueue to instances of this class. This field will be initialized by JNI when modifying kqueue events.
|
|
||||||
* If there is no global reference when JNI gets a kqueue evSet call (aka this field is 0) then a global reference
|
|
||||||
* will be created and the address will be saved in this member variable. Then when we process a kevent in Java
|
|
||||||
* we can ask JNI to give us the {@link AbstractKQueueChannel} that corresponds to that event.
|
|
||||||
*/
|
|
||||||
long jniSelfPtr;
|
|
||||||
|
|
||||||
protected volatile boolean active;
|
protected volatile boolean active;
|
||||||
private volatile SocketAddress local;
|
private volatile SocketAddress local;
|
||||||
private volatile SocketAddress remote;
|
private volatile SocketAddress remote;
|
||||||
@ -213,6 +204,9 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
|
|||||||
// make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
|
// make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
|
||||||
// new EventLoop.
|
// new EventLoop.
|
||||||
readReadyRunnablePending = false;
|
readReadyRunnablePending = false;
|
||||||
|
|
||||||
|
((KQueueEventLoop) eventLoop()).add(this);
|
||||||
|
|
||||||
// Add the write event first so we get notified of connection refused on the client side!
|
// Add the write event first so we get notified of connection refused on the client side!
|
||||||
if (writeFilterEnabled) {
|
if (writeFilterEnabled) {
|
||||||
evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
|
evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
|
||||||
|
@ -79,7 +79,7 @@ final class KQueueEventArray {
|
|||||||
|
|
||||||
void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
|
void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
|
||||||
reallocIfNeeded();
|
reallocIfNeeded();
|
||||||
evSet(getKEventOffset(size++) + memoryAddress, ch, ch.socket.intValue(), filter, flags, fflags);
|
evSet(getKEventOffset(size++) + memoryAddress, ch.socket.intValue(), filter, flags, fflags);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void reallocIfNeeded() {
|
private void reallocIfNeeded() {
|
||||||
@ -165,16 +165,9 @@ final class KQueueEventArray {
|
|||||||
return memory.getLong(getKEventOffset(index) + KQUEUE_DATA_OFFSET);
|
return memory.getLong(getKEventOffset(index) + KQUEUE_DATA_OFFSET);
|
||||||
}
|
}
|
||||||
|
|
||||||
AbstractKQueueChannel channel(int index) {
|
|
||||||
return getChannel(getKEventOffsetAddress(index));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static int calculateBufferCapacity(int capacity) {
|
private static int calculateBufferCapacity(int capacity) {
|
||||||
return capacity * KQUEUE_EVENT_SIZE;
|
return capacity * KQUEUE_EVENT_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static native void evSet(long keventAddress, AbstractKQueueChannel ch,
|
private static native void evSet(long keventAddress, int ident, short filter, short flags, int fflags);
|
||||||
int ident, short filter, short flags, int fflags);
|
|
||||||
private static native AbstractKQueueChannel getChannel(long keventAddress);
|
|
||||||
static native void deleteGlobalRefs(long channelAddressStart, long channelAddressEnd);
|
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,8 @@ import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe;
|
|||||||
import io.netty.channel.unix.FileDescriptor;
|
import io.netty.channel.unix.FileDescriptor;
|
||||||
import io.netty.channel.unix.IovArray;
|
import io.netty.channel.unix.IovArray;
|
||||||
import io.netty.util.IntSupplier;
|
import io.netty.util.IntSupplier;
|
||||||
|
import io.netty.util.collection.IntObjectHashMap;
|
||||||
|
import io.netty.util.collection.IntObjectMap;
|
||||||
import io.netty.util.concurrent.RejectedExecutionHandler;
|
import io.netty.util.concurrent.RejectedExecutionHandler;
|
||||||
import io.netty.util.internal.ObjectUtil;
|
import io.netty.util.internal.ObjectUtil;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
@ -31,11 +33,9 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||||
|
|
||||||
import static io.netty.channel.kqueue.KQueueEventArray.deleteGlobalRefs;
|
|
||||||
import static java.lang.Math.min;
|
import static java.lang.Math.min;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -53,7 +53,6 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
|
|||||||
KQueue.ensureAvailability();
|
KQueue.ensureAvailability();
|
||||||
}
|
}
|
||||||
|
|
||||||
private final NativeLongArray jniChannelPointers;
|
|
||||||
private final boolean allowGrowing;
|
private final boolean allowGrowing;
|
||||||
private final FileDescriptor kqueueFd;
|
private final FileDescriptor kqueueFd;
|
||||||
private final KQueueEventArray changeList;
|
private final KQueueEventArray changeList;
|
||||||
@ -66,6 +65,7 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
|
|||||||
return kqueueWaitNow();
|
return kqueueWaitNow();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);
|
||||||
|
|
||||||
private volatile int wakenUp;
|
private volatile int wakenUp;
|
||||||
private volatile int ioRatio = 50;
|
private volatile int ioRatio = 50;
|
||||||
@ -83,7 +83,6 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
|
|||||||
}
|
}
|
||||||
changeList = new KQueueEventArray(maxEvents);
|
changeList = new KQueueEventArray(maxEvents);
|
||||||
eventList = new KQueueEventArray(maxEvents);
|
eventList = new KQueueEventArray(maxEvents);
|
||||||
jniChannelPointers = new NativeLongArray(4096);
|
|
||||||
int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
|
int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
|
||||||
if (result < 0) {
|
if (result < 0) {
|
||||||
cleanup();
|
cleanup();
|
||||||
@ -91,18 +90,18 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void add(AbstractKQueueChannel ch) {
|
||||||
|
assert inEventLoop();
|
||||||
|
channels.put(ch.fd().intValue(), ch);
|
||||||
|
}
|
||||||
|
|
||||||
void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
|
void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
|
||||||
changeList.evSet(ch, filter, flags, fflags);
|
changeList.evSet(ch, filter, flags, fflags);
|
||||||
}
|
}
|
||||||
|
|
||||||
void remove(AbstractKQueueChannel ch) throws IOException {
|
void remove(AbstractKQueueChannel ch) {
|
||||||
assert inEventLoop();
|
assert inEventLoop();
|
||||||
if (ch.jniSelfPtr == 0) {
|
channels.remove(ch.fd().intValue());
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
jniChannelPointers.add(ch.jniSelfPtr);
|
|
||||||
ch.jniSelfPtr = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -145,32 +144,25 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
|
private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
|
||||||
deleteJniChannelPointers();
|
|
||||||
int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
|
int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
|
||||||
changeList.clear();
|
changeList.clear();
|
||||||
return numEvents;
|
return numEvents;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteJniChannelPointers() {
|
|
||||||
if (!jniChannelPointers.isEmpty()) {
|
|
||||||
deleteGlobalRefs(jniChannelPointers.memoryAddress(), jniChannelPointers.memoryAddressEnd());
|
|
||||||
jniChannelPointers.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processReady(int ready) {
|
private void processReady(int ready) {
|
||||||
for (int i = 0; i < ready; ++i) {
|
for (int i = 0; i < ready; ++i) {
|
||||||
final short filter = eventList.filter(i);
|
final short filter = eventList.filter(i);
|
||||||
final short flags = eventList.flags(i);
|
final short flags = eventList.flags(i);
|
||||||
|
final int fd = eventList.fd(i);
|
||||||
if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
|
if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
|
||||||
// EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
|
// EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
|
||||||
// we later attempt to delete the filters from kqueue.
|
// we later attempt to delete the filters from kqueue.
|
||||||
assert filter != Native.EVFILT_USER ||
|
assert filter != Native.EVFILT_USER ||
|
||||||
(filter == Native.EVFILT_USER && eventList.fd(i) == KQUEUE_WAKE_UP_IDENT);
|
(filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
AbstractKQueueChannel channel = eventList.channel(i);
|
AbstractKQueueChannel channel = channels.get(fd);
|
||||||
if (channel == null) {
|
if (channel == null) {
|
||||||
// This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
|
// This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
|
||||||
// We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
|
// We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
|
||||||
@ -327,12 +319,6 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
|
|||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// Cleanup all native memory!
|
// Cleanup all native memory!
|
||||||
|
|
||||||
// The JNI channel pointers should already be deleted because we should wait on kevent before this method,
|
|
||||||
// but lets just be sure we cleanup native memory.
|
|
||||||
deleteJniChannelPointers();
|
|
||||||
jniChannelPointers.free();
|
|
||||||
|
|
||||||
changeList.free();
|
changeList.free();
|
||||||
eventList.free();
|
eventList.free();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user