diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index f3b7def978..1f8df12c99 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -50,16 +51,6 @@ #define TCP_FASTOPEN 23 #endif -/** - * On older Linux kernels, epoll can't handle timeout - * values bigger than (LONG_MAX - 999ULL)/HZ. - * - * See: - * - https://github.com/libevent/libevent/blob/master/epoll.c#L138 - * - http://cvs.schmorp.de/libev/ev_epoll.c?revision=1.68&view=markup - */ -#define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000) - // optional extern int epoll_create1(int flags) __attribute__((weak)); @@ -86,8 +77,6 @@ jfieldID packetPortFieldId = NULL; jfieldID packetMemoryAddressFieldId = NULL; jfieldID packetCountFieldId = NULL; -clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock - // util methods static int getSysctlValue(const char * property, int* returnValue) { int rc = -1; @@ -117,18 +106,25 @@ static jint netty_epoll_native_eventFd(JNIEnv* env, jclass clazz) { jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (eventFD < 0) { - int err = errno; - netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", err); + netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", errno); } return eventFD; } +static jint netty_epoll_native_timerFd(JNIEnv* env, jclass clazz) { + jint timerFD = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); + + if (timerFD < 0) { + netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_create() failed: ", errno); + } + return timerFD; +} + static void netty_epoll_native_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) { jint eventFD = eventfd_write(fd, (eventfd_t) value); if (eventFD < 0) { - int err = errno; - netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write() failed: ", err); + netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write() failed: ", errno); } } @@ -141,6 +137,15 @@ static void netty_epoll_native_eventFdRead(JNIEnv* env, jclass clazz, jint fd) { } } +static void netty_epoll_native_timerFdRead(JNIEnv* env, jclass clazz, jint fd) { + uint64_t timerFireCount; + + if (read(fd, &timerFireCount, sizeof(uint64_t)) < 0) { + // it is expected that this is only called where there is known to be activity, so this is an error. + netty_unix_errors_throwChannelExceptionErrorNo(env, "read() failed: ", errno); + } +} + static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) { jint efd; if (epoll_create1) { @@ -169,42 +174,44 @@ static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) { return efd; } -static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) { +static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) { struct epoll_event *ev = (struct epoll_event*) (intptr_t) address; - struct timespec ts; - jlong timeBeforeWait, timeNow; - int timeDiff, result, err; + int result, err; - if (timeout > MAX_EPOLL_TIMEOUT_MSEC) { - // Workaround for bug in older linux kernels that can not handle bigger timeout then MAX_EPOLL_TIMEOUT_MSEC. - timeout = MAX_EPOLL_TIMEOUT_MSEC; - } - - clock_gettime(waitClockId, &ts); - timeBeforeWait = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; - - for (;;) { - result = epoll_wait(efd, ev, len, timeout); - if (result >= 0) { - return result; - } - if ((err = errno) == EINTR) { - if (timeout > 0) { - clock_gettime(waitClockId, &ts); - timeNow = ts.tv_sec * 1000 + ts.tv_nsec / 1000000; - timeDiff = timeNow - timeBeforeWait; - timeout -= timeDiff; - if (timeDiff < 0 || timeout <= 0) { - return 0; - } - timeBeforeWait = timeNow; - } else if (timeout == 0) { - return 0; + if (tvSec == 0 && tvNsec == 0) { + // Zeros = poll (aka return immediately). + do { + result = epoll_wait(efd, ev, len, 0); + if (result >= 0) { + return result; + } + } while((err = errno) == EINTR); + } else { + struct itimerspec ts; + memset(&ts.it_interval, 0, sizeof(struct timespec)); + ts.it_value.tv_sec = tvSec; + ts.it_value.tv_nsec = tvNsec; + if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) { + netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_settime() failed: ", errno); + return -1; } - } else { - return -err; - } + do { + result = epoll_wait(efd, ev, len, -1); + if (result > 0) { + // Detect timeout, and preserve the epoll_wait API. + if (result == 1 && ev[0].data.fd == timerFd) { + // We assume that timerFD is in ET mode. So we must consume this event to ensure we are notified + // of future timer events because ET mode only notifies a single time until the event is consumed. + uint64_t timerFireCount; + // We don't care what the result is. We just want to consume the wakeup event and reset ET. + result = read(timerFd, &timerFireCount, sizeof(uint64_t)); + return 0; + } + return result; + } + } while((err = errno) == EINTR); } + return -err; } static jint netty_epoll_native_epollCtlAdd0(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags) { @@ -311,8 +318,7 @@ static jstring netty_epoll_native_kernelVersion(JNIEnv* env, jclass clazz) { if (res == 0) { return (*env)->NewStringUTF(env, name.release); } - int err = errno; - netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", err); + netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", errno); return NULL; } @@ -409,10 +415,12 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = { static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]); static const JNINativeMethod fixed_method_table[] = { { "eventFd", "()I", (void *) netty_epoll_native_eventFd }, + { "timerFd", "()I", (void *) netty_epoll_native_timerFd }, { "eventFdWrite", "(IJ)V", (void *) netty_epoll_native_eventFdWrite }, { "eventFdRead", "(I)V", (void *) netty_epoll_native_eventFdRead }, + { "timerFdRead", "(I)V", (void *) netty_epoll_native_timerFdRead }, { "epollCreate", "()I", (void *) netty_epoll_native_epollCreate }, - { "epollWait0", "(IJII)I", (void *) netty_epoll_native_epollWait0 }, + { "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, { "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 }, { "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 }, { "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 }, @@ -572,11 +580,6 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix return JNI_ERR; } - if (!netty_unix_util_initialize_wait_clock(&waitClockId)) { - fprintf(stderr, "FATAL: could not find a clock for clock_gettime!\n"); - return JNI_ERR; - } - return NETTY_JNI_VERSION; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 16f0adff74..b7d02283e7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -39,6 +39,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import static java.lang.Math.min; + /** * {@link EventLoop} which uses epoll under the covers. Only works on Linux! */ @@ -55,6 +57,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { private final FileDescriptor epollFd; private final FileDescriptor eventFd; + private final FileDescriptor timerFd; private final IntObjectMap channels = new IntObjectHashMap(4096); private final boolean allowGrowing; private final EpollEventArray events; @@ -63,7 +66,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { - return Native.epollWait(epollFd.intValue(), events, 0); + return epollWaitNow(); } }; private final Callable pendingTasksCallable = new Callable() { @@ -89,6 +92,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { boolean success = false; FileDescriptor epollFd = null; FileDescriptor eventFd = null; + FileDescriptor timerFd = null; try { this.epollFd = epollFd = Native.newEpollCreate(); this.eventFd = eventFd = Native.newEventFd(); @@ -97,6 +101,12 @@ final class EpollEventLoop extends SingleThreadEventLoop { } catch (IOException e) { throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e); } + this.timerFd = timerFd = Native.newTimerFd(); + try { + Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET); + } catch (IOException e) { + throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e); + } success = true; } finally { if (!success) { @@ -114,6 +124,13 @@ final class EpollEventLoop extends SingleThreadEventLoop { // ignore } } + if (timerFd != null) { + try { + timerFd.close(); + } catch (Exception e) { + // ignore + } + } } } } @@ -204,43 +221,23 @@ final class EpollEventLoop extends SingleThreadEventLoop { this.ioRatio = ioRatio; } - private int epollWait(boolean oldWakenUp) throws IOException { - int selectCnt = 0; - long currentTimeNanos = System.nanoTime(); - long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); - for (;;) { - long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; - if (timeoutMillis <= 0) { - if (selectCnt == 0) { - int ready = Native.epollWait(epollFd.intValue(), events, 0); - if (ready > 0) { - return ready; - } - } - break; - } - - // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event. - // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended - // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed - // in pipeline. - if (hasTasks() && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) { - return Native.epollWait(epollFd.intValue(), events, 0); - } - - int selectedKeys = Native.epollWait(epollFd.intValue(), events, (int) timeoutMillis); - selectCnt ++; - - if (selectedKeys != 0 || oldWakenUp || wakenUp == 1 || hasTasks() || hasScheduledTasks()) { - // - Selected something, - // - waken up by user, or - // - the task queue has a pending task. - // - a scheduled task is ready for processing - return selectedKeys; - } - currentTimeNanos = System.nanoTime(); + private int epollWait(boolean oldWakeup) throws IOException { + // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event. + // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended + // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed + // in pipeline. + if (oldWakeup && hasTasks()) { + return epollWaitNow(); } - return 0; + + long totalDelay = delayNanos(System.nanoTime()); + int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE); + return Native.epollWait(epollFd, events, timerFd, delaySeconds, + (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE)); + } + + private int epollWaitNow() throws IOException { + return Native.epollWait(epollFd, events, timerFd, 0, 0); } @Override @@ -347,7 +344,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { private void closeAll() { try { - Native.epollWait(epollFd.intValue(), events, 0); + epollWaitNow(); } catch (IOException ignore) { // ignore on close } @@ -368,8 +365,11 @@ final class EpollEventLoop extends SingleThreadEventLoop { for (int i = 0; i < ready; i ++) { final int fd = events.fd(i); if (fd == eventFd.intValue()) { - // consume wakeup event - Native.eventFdRead(eventFd.intValue()); + // consume wakeup event. + Native.eventFdRead(fd); + } else if (fd == timerFd.intValue()) { + // consume wakeup event, necessary because the timer is added with ET mode. + Native.timerFdRead(fd); } else { final long ev = events.events(i); @@ -438,6 +438,11 @@ final class EpollEventLoop extends SingleThreadEventLoop { } catch (IOException e) { logger.warn("Failed to close the event fd.", e); } + try { + timerFd.close(); + } catch (IOException e) { + logger.warn("Failed to close the timer fd.", e); + } } finally { // release native memory iovArray.release(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 423d30c7d0..f621526238 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -97,9 +97,15 @@ public final class Native { return new FileDescriptor(eventFd()); } + public static FileDescriptor newTimerFd() { + return new FileDescriptor(timerFd()); + } + private static native int eventFd(); + private static native int timerFd(); public static native void eventFdWrite(int fd, long value); public static native void eventFdRead(int fd); + static native void timerFdRead(int fd); public static FileDescriptor newEpollCreate() { return new FileDescriptor(epollCreate()); @@ -107,14 +113,16 @@ public final class Native { private static native int epollCreate(); - public static int epollWait(int efd, EpollEventArray events, int timeout) throws IOException { - int ready = epollWait0(efd, events.memoryAddress(), events.length(), timeout); + public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd, + int timeoutSec, int timeoutNs) throws IOException { + int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(), + timeoutSec, timeoutNs); if (ready < 0) { throw newIOException("epoll_wait", ready); } return ready; } - private static native int epollWait0(int efd, long address, int len, int timeout); + private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs); public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException { int res = epollCtlAdd0(efd, fd, flags); diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_native.c b/transport-native-kqueue/src/main/c/netty_kqueue_native.c index b1a65a9333..710b7b31c6 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_native.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_native.c @@ -109,11 +109,10 @@ static jint netty_kqueue_native_keventWait(JNIEnv* env, jclass clazz, jint kqueu timeoutTs.tv_sec = tvSec; timeoutTs.tv_nsec = tvNsec; - // Negatives = wait indefinitely, Zeros = poll (aka return immediately). - if ((tvSec == 0 && tvNsec == 0) || tvSec < 0 || tvNsec < 0) { - const struct timespec* fixedTs = (tvSec == 0 && tvNsec == 0) ? &timeoutTs : NULL; + if (tvSec == 0 && tvNsec == 0) { + // Zeros = poll (aka return immediately). for (;;) { - result = kevent(kqueueFd, changeList, changeListLength, eventList, eventListLength, fixedTs); + result = kevent(kqueueFd, changeList, changeListLength, eventList, eventListLength, &timeoutTs); if (result >= 0) { return result; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index 9fbd0b26aa..5af59ba912 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -133,12 +133,9 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } private int kqueueWait(boolean oldWakeup) throws IOException { - // TODO(scott): do we need to loop here ... we already loop in keventWait to ensure we wait the expected time. - // We also do the same thing in EPOLL ... do we need to loop there? - // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event. - // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended - // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed + // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended + // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed // in pipeline. if (oldWakeup && hasTasks()) { return kqueueWaitNow();