From fe2dd973e9cc537027f75cefa447a6d8ebe5695d Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Wed, 2 Aug 2017 00:15:18 -0700 Subject: [PATCH] Unify KQueue and Epoll wait timeout approach Motivation: KQueueEventLoop and EpollEventLoop implement different approaches to applying a timeout of their respective poll calls. Epoll attempts to ensure the desired timeout is satisfied at the java layer and at the JNI layer, but it should be sufficient to account for spurious wakups at the JNI layer. Epoll timeout granularity is also limited to milliseconds which may be too large for some latency sensitive applications. Modifications: - Make EpollEventLoop wait method look like KQueueEventLoop - Epoll should support a finer timeout granularity via timerfd_create. We can hide most of these details behind the epollWait0 JNI call to avoid crossing additional JNI boundaries. Result: More consistent timeout approach between KQueue and Epoll. --- .../src/main/c/netty_epoll_native.c | 115 +++++++++--------- .../netty/channel/epoll/EpollEventLoop.java | 85 +++++++------ .../java/io/netty/channel/epoll/Native.java | 14 ++- .../src/main/c/netty_kqueue_native.c | 7 +- .../netty/channel/kqueue/KQueueEventLoop.java | 7 +- 5 files changed, 120 insertions(+), 108 deletions(-) diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index f3b7def978..1f8df12c99 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -50,16 +51,6 @@ #define TCP_FASTOPEN 23 #endif -/** - * On older Linux kernels, epoll can't handle timeout - * values bigger than (LONG_MAX - 999ULL)/HZ. - * - * See: - * - https://github.com/libevent/libevent/blob/master/epoll.c#L138 - * - http://cvs.schmorp.de/libev/ev_epoll.c?revision=1.68&view=markup - */ -#define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000) - // optional extern int epoll_create1(int flags) __attribute__((weak)); @@ -86,8 +77,6 @@ jfieldID packetPortFieldId = NULL; jfieldID packetMemoryAddressFieldId = NULL; jfieldID packetCountFieldId = NULL; -clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock - // util methods static int getSysctlValue(const char * property, int* returnValue) { int rc = -1; @@ -117,18 +106,25 @@ static jint netty_epoll_native_eventFd(JNIEnv* env, jclass clazz) { jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (eventFD < 0) { - int err = errno; - netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", err); + netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", errno); } return eventFD; } +static jint netty_epoll_native_timerFd(JNIEnv* env, jclass clazz) { + jint timerFD = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); + + if (timerFD < 0) { + netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_create() failed: ", errno); + } + return timerFD; +} + static void netty_epoll_native_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) { jint eventFD = eventfd_write(fd, (eventfd_t) value); if (eventFD < 0) { - int err = errno; - netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write() failed: ", err); + netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write() failed: ", errno); } } @@ -141,6 +137,15 @@ static void netty_epoll_native_eventFdRead(JNIEnv* env, jclass clazz, jint fd) { } } +static void netty_epoll_native_timerFdRead(JNIEnv* env, jclass clazz, jint fd) { + uint64_t timerFireCount; + + if (read(fd, &timerFireCount, sizeof(uint64_t)) < 0) { + // it is expected that this is only called where there is known to be activity, so this is an error. + netty_unix_errors_throwChannelExceptionErrorNo(env, "read() failed: ", errno); + } +} + static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) { jint efd; if (epoll_create1) { @@ -169,42 +174,44 @@ static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) { return efd; } -static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) { +static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) { struct epoll_event *ev = (struct epoll_event*) (intptr_t) address; - struct timespec ts; - jlong timeBeforeWait, timeNow; - int timeDiff, result, err; + int result, err; - if (timeout > MAX_EPOLL_TIMEOUT_MSEC) { - // Workaround for bug in older linux kernels that can not handle bigger timeout then MAX_EPOLL_TIMEOUT_MSEC. - timeout = MAX_EPOLL_TIMEOUT_MSEC; - } - - clock_gettime(waitClockId, &ts); - timeBeforeWait = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; - - for (;;) { - result = epoll_wait(efd, ev, len, timeout); - if (result >= 0) { - return result; - } - if ((err = errno) == EINTR) { - if (timeout > 0) { - clock_gettime(waitClockId, &ts); - timeNow = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; - timeDiff = timeNow - timeBeforeWait; - timeout -= timeDiff; - if (timeDiff < 0 || timeout <= 0) { - return 0; - } - timeBeforeWait = timeNow; - } else if (timeout == 0) { - return 0; + if (tvSec == 0 && tvNsec == 0) { + // Zeros = poll (aka return immediately). + do { + result = epoll_wait(efd, ev, len, 0); + if (result >= 0) { + return result; + } + } while((err = errno) == EINTR); + } else { + struct itimerspec ts; + memset(&ts.it_interval, 0, sizeof(struct timespec)); + ts.it_value.tv_sec = tvSec; + ts.it_value.tv_nsec = tvNsec; + if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) { + netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_settime() failed: ", errno); + return -1; } - } else { - return -err; - } + do { + result = epoll_wait(efd, ev, len, -1); + if (result > 0) { + // Detect timeout, and preserve the epoll_wait API. + if (result == 1 && ev[0].data.fd == timerFd) { + // We assume that timerFD is in ET mode. So we must consume this event to ensure we are notified + // of future timer events because ET mode only notifies a single time until the event is consumed. + uint64_t timerFireCount; + // We don't care what the result is. We just want to consume the wakeup event and reset ET. + result = read(timerFd, &timerFireCount, sizeof(uint64_t)); + return 0; + } + return result; + } + } while((err = errno) == EINTR); } + return -err; } static jint netty_epoll_native_epollCtlAdd0(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags) { @@ -311,8 +318,7 @@ static jstring netty_epoll_native_kernelVersion(JNIEnv* env, jclass clazz) { if (res == 0) { return (*env)->NewStringUTF(env, name.release); } - int err = errno; - netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", err); + netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", errno); return NULL; } @@ -409,10 +415,12 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = { static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]); static const JNINativeMethod fixed_method_table[] = { { "eventFd", "()I", (void *) netty_epoll_native_eventFd }, + { "timerFd", "()I", (void *) netty_epoll_native_timerFd }, { "eventFdWrite", "(IJ)V", (void *) netty_epoll_native_eventFdWrite }, { "eventFdRead", "(I)V", (void *) netty_epoll_native_eventFdRead }, + { "timerFdRead", "(I)V", (void *) netty_epoll_native_timerFdRead }, { "epollCreate", "()I", (void *) netty_epoll_native_epollCreate }, - { "epollWait0", "(IJII)I", (void *) netty_epoll_native_epollWait0 }, + { "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, { "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 }, { "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 }, { "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 }, @@ -572,11 +580,6 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix return JNI_ERR; } - if (!netty_unix_util_initialize_wait_clock(&waitClockId)) { - fprintf(stderr, "FATAL: could not find a clock for clock_gettime!\n"); - return JNI_ERR; - } - return NETTY_JNI_VERSION; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 16f0adff74..b7d02283e7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -39,6 +39,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import static java.lang.Math.min; + /** * {@link EventLoop} which uses epoll under the covers. Only works on Linux! */ @@ -55,6 +57,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { private final FileDescriptor epollFd; private final FileDescriptor eventFd; + private final FileDescriptor timerFd; private final IntObjectMap channels = new IntObjectHashMap(4096); private final boolean allowGrowing; private final EpollEventArray events; @@ -63,7 +66,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { - return Native.epollWait(epollFd.intValue(), events, 0); + return epollWaitNow(); } }; private final Callable pendingTasksCallable = new Callable() { @@ -89,6 +92,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { boolean success = false; FileDescriptor epollFd = null; FileDescriptor eventFd = null; + FileDescriptor timerFd = null; try { this.epollFd = epollFd = Native.newEpollCreate(); this.eventFd = eventFd = Native.newEventFd(); @@ -97,6 +101,12 @@ final class EpollEventLoop extends SingleThreadEventLoop { } catch (IOException e) { throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e); } + this.timerFd = timerFd = Native.newTimerFd(); + try { + Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET); + } catch (IOException e) { + throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e); + } success = true; } finally { if (!success) { @@ -114,6 +124,13 @@ final class EpollEventLoop extends SingleThreadEventLoop { // ignore } } + if (timerFd != null) { + try { + timerFd.close(); + } catch (Exception e) { + // ignore + } + } } } } @@ -204,43 +221,23 @@ final class EpollEventLoop extends SingleThreadEventLoop { this.ioRatio = ioRatio; } - private int epollWait(boolean oldWakenUp) throws IOException { - int selectCnt = 0; - long currentTimeNanos = System.nanoTime(); - long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); - for (;;) { - long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; - if (timeoutMillis <= 0) { - if (selectCnt == 0) { - int ready = Native.epollWait(epollFd.intValue(), events, 0); - if (ready > 0) { - return ready; - } - } - break; - } - - // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event. - // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended - // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed - // in pipeline. - if (hasTasks() && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) { - return Native.epollWait(epollFd.intValue(), events, 0); - } - - int selectedKeys = Native.epollWait(epollFd.intValue(), events, (int) timeoutMillis); - selectCnt ++; - - if (selectedKeys != 0 || oldWakenUp || wakenUp == 1 || hasTasks() || hasScheduledTasks()) { - // - Selected something, - // - waken up by user, or - // - the task queue has a pending task. - // - a scheduled task is ready for processing - return selectedKeys; - } - currentTimeNanos = System.nanoTime(); + private int epollWait(boolean oldWakeup) throws IOException { + // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event. + // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended + // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed + // in pipeline. + if (oldWakeup && hasTasks()) { + return epollWaitNow(); } - return 0; + + long totalDelay = delayNanos(System.nanoTime()); + int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE); + return Native.epollWait(epollFd, events, timerFd, delaySeconds, + (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE)); + } + + private int epollWaitNow() throws IOException { + return Native.epollWait(epollFd, events, timerFd, 0, 0); } @Override @@ -347,7 +344,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { private void closeAll() { try { - Native.epollWait(epollFd.intValue(), events, 0); + epollWaitNow(); } catch (IOException ignore) { // ignore on close } @@ -368,8 +365,11 @@ final class EpollEventLoop extends SingleThreadEventLoop { for (int i = 0; i < ready; i ++) { final int fd = events.fd(i); if (fd == eventFd.intValue()) { - // consume wakeup event - Native.eventFdRead(eventFd.intValue()); + // consume wakeup event. + Native.eventFdRead(fd); + } else if (fd == timerFd.intValue()) { + // consume wakeup event, necessary because the timer is added with ET mode. + Native.timerFdRead(fd); } else { final long ev = events.events(i); @@ -438,6 +438,11 @@ final class EpollEventLoop extends SingleThreadEventLoop { } catch (IOException e) { logger.warn("Failed to close the event fd.", e); } + try { + timerFd.close(); + } catch (IOException e) { + logger.warn("Failed to close the timer fd.", e); + } } finally { // release native memory iovArray.release(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 423d30c7d0..f621526238 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -97,9 +97,15 @@ public final class Native { return new FileDescriptor(eventFd()); } + public static FileDescriptor newTimerFd() { + return new FileDescriptor(timerFd()); + } + private static native int eventFd(); + private static native int timerFd(); public static native void eventFdWrite(int fd, long value); public static native void eventFdRead(int fd); + static native void timerFdRead(int fd); public static FileDescriptor newEpollCreate() { return new FileDescriptor(epollCreate()); @@ -107,14 +113,16 @@ public final class Native { private static native int epollCreate(); - public static int epollWait(int efd, EpollEventArray events, int timeout) throws IOException { - int ready = epollWait0(efd, events.memoryAddress(), events.length(), timeout); + public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd, + int timeoutSec, int timeoutNs) throws IOException { + int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(), + timeoutSec, timeoutNs); if (ready < 0) { throw newIOException("epoll_wait", ready); } return ready; } - private static native int epollWait0(int efd, long address, int len, int timeout); + private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs); public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException { int res = epollCtlAdd0(efd, fd, flags); diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_native.c b/transport-native-kqueue/src/main/c/netty_kqueue_native.c index b1a65a9333..710b7b31c6 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_native.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_native.c @@ -109,11 +109,10 @@ static jint netty_kqueue_native_keventWait(JNIEnv* env, jclass clazz, jint kqueu timeoutTs.tv_sec = tvSec; timeoutTs.tv_nsec = tvNsec; - // Negatives = wait indefinitely, Zeros = poll (aka return immediately). - if ((tvSec == 0 && tvNsec == 0) || tvSec < 0 || tvNsec < 0) { - const struct timespec* fixedTs = (tvSec == 0 && tvNsec == 0) ? &timeoutTs : NULL; + if (tvSec == 0 && tvNsec == 0) { + // Zeros = poll (aka return immediately). for (;;) { - result = kevent(kqueueFd, changeList, changeListLength, eventList, eventListLength, fixedTs); + result = kevent(kqueueFd, changeList, changeListLength, eventList, eventListLength, &timeoutTs); if (result >= 0) { return result; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index 9fbd0b26aa..5af59ba912 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -133,12 +133,9 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } private int kqueueWait(boolean oldWakeup) throws IOException { - // TODO(scott): do we need to loop here ... we already loop in keventWait to ensure we wait the expected time. - // We also do the same thing in EPOLL ... do we need to loop there? - // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event. - // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended - // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed + // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended + // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed // in pipeline. if (oldWakeup && hasTasks()) { return kqueueWaitNow();