Unify KQueue and Epoll wait timeout approach

Motivation:
KQueueEventLoop and EpollEventLoop implement different approaches to applying a timeout of their respective poll calls. Epoll attempts to ensure the desired timeout is satisfied at the java layer and at the JNI layer, but it should be sufficient to account for spurious wakups at the JNI layer. Epoll timeout granularity is also limited to milliseconds which may be too large for some latency sensitive applications.

Modifications:
- Make EpollEventLoop wait method look like KQueueEventLoop
- Epoll should support a finer timeout granularity via timerfd_create. We can hide most of these details behind the epollWait0 JNI call to avoid crossing additional JNI boundaries.

Result:
More consistent timeout approach between KQueue and Epoll.
This commit is contained in:
Scott Mitchell 2017-08-02 00:15:18 -07:00
parent 1d7c3fb7ee
commit fe2dd973e9
5 changed files with 120 additions and 108 deletions

View File

@ -27,6 +27,7 @@
#include <netinet/tcp.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
@ -50,16 +51,6 @@
#define TCP_FASTOPEN 23
#endif
/**
* On older Linux kernels, epoll can't handle timeout
* values bigger than (LONG_MAX - 999ULL)/HZ.
*
* See:
* - https://github.com/libevent/libevent/blob/master/epoll.c#L138
* - http://cvs.schmorp.de/libev/ev_epoll.c?revision=1.68&view=markup
*/
#define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000)
// optional
extern int epoll_create1(int flags) __attribute__((weak));
@ -86,8 +77,6 @@ jfieldID packetPortFieldId = NULL;
jfieldID packetMemoryAddressFieldId = NULL;
jfieldID packetCountFieldId = NULL;
clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock
// util methods
static int getSysctlValue(const char * property, int* returnValue) {
int rc = -1;
@ -117,18 +106,25 @@ static jint netty_epoll_native_eventFd(JNIEnv* env, jclass clazz) {
jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (eventFD < 0) {
int err = errno;
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", err);
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", errno);
}
return eventFD;
}
static jint netty_epoll_native_timerFd(JNIEnv* env, jclass clazz) {
jint timerFD = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
if (timerFD < 0) {
netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_create() failed: ", errno);
}
return timerFD;
}
static void netty_epoll_native_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) {
jint eventFD = eventfd_write(fd, (eventfd_t) value);
if (eventFD < 0) {
int err = errno;
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write() failed: ", err);
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write() failed: ", errno);
}
}
@ -141,6 +137,15 @@ static void netty_epoll_native_eventFdRead(JNIEnv* env, jclass clazz, jint fd) {
}
}
static void netty_epoll_native_timerFdRead(JNIEnv* env, jclass clazz, jint fd) {
uint64_t timerFireCount;
if (read(fd, &timerFireCount, sizeof(uint64_t)) < 0) {
// it is expected that this is only called where there is known to be activity, so this is an error.
netty_unix_errors_throwChannelExceptionErrorNo(env, "read() failed: ", errno);
}
}
static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) {
jint efd;
if (epoll_create1) {
@ -169,42 +174,44 @@ static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) {
return efd;
}
static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) {
static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
struct timespec ts;
jlong timeBeforeWait, timeNow;
int timeDiff, result, err;
int result, err;
if (timeout > MAX_EPOLL_TIMEOUT_MSEC) {
// Workaround for bug in older linux kernels that can not handle bigger timeout then MAX_EPOLL_TIMEOUT_MSEC.
timeout = MAX_EPOLL_TIMEOUT_MSEC;
}
clock_gettime(waitClockId, &ts);
timeBeforeWait = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
for (;;) {
result = epoll_wait(efd, ev, len, timeout);
if (result >= 0) {
return result;
}
if ((err = errno) == EINTR) {
if (timeout > 0) {
clock_gettime(waitClockId, &ts);
timeNow = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
timeDiff = timeNow - timeBeforeWait;
timeout -= timeDiff;
if (timeDiff < 0 || timeout <= 0) {
return 0;
}
timeBeforeWait = timeNow;
} else if (timeout == 0) {
return 0;
if (tvSec == 0 && tvNsec == 0) {
// Zeros = poll (aka return immediately).
do {
result = epoll_wait(efd, ev, len, 0);
if (result >= 0) {
return result;
}
} while((err = errno) == EINTR);
} else {
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
ts.it_value.tv_nsec = tvNsec;
if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_settime() failed: ", errno);
return -1;
}
} else {
return -err;
}
do {
result = epoll_wait(efd, ev, len, -1);
if (result > 0) {
// Detect timeout, and preserve the epoll_wait API.
if (result == 1 && ev[0].data.fd == timerFd) {
// We assume that timerFD is in ET mode. So we must consume this event to ensure we are notified
// of future timer events because ET mode only notifies a single time until the event is consumed.
uint64_t timerFireCount;
// We don't care what the result is. We just want to consume the wakeup event and reset ET.
result = read(timerFd, &timerFireCount, sizeof(uint64_t));
return 0;
}
return result;
}
} while((err = errno) == EINTR);
}
return -err;
}
static jint netty_epoll_native_epollCtlAdd0(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags) {
@ -311,8 +318,7 @@ static jstring netty_epoll_native_kernelVersion(JNIEnv* env, jclass clazz) {
if (res == 0) {
return (*env)->NewStringUTF(env, name.release);
}
int err = errno;
netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", err);
netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", errno);
return NULL;
}
@ -409,10 +415,12 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]);
static const JNINativeMethod fixed_method_table[] = {
{ "eventFd", "()I", (void *) netty_epoll_native_eventFd },
{ "timerFd", "()I", (void *) netty_epoll_native_timerFd },
{ "eventFdWrite", "(IJ)V", (void *) netty_epoll_native_eventFdWrite },
{ "eventFdRead", "(I)V", (void *) netty_epoll_native_eventFdRead },
{ "timerFdRead", "(I)V", (void *) netty_epoll_native_timerFdRead },
{ "epollCreate", "()I", (void *) netty_epoll_native_epollCreate },
{ "epollWait0", "(IJII)I", (void *) netty_epoll_native_epollWait0 },
{ "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 },
{ "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 },
{ "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 },
{ "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 },
@ -572,11 +580,6 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix
return JNI_ERR;
}
if (!netty_unix_util_initialize_wait_clock(&waitClockId)) {
fprintf(stderr, "FATAL: could not find a clock for clock_gettime!\n");
return JNI_ERR;
}
return NETTY_JNI_VERSION;
}

View File

@ -39,6 +39,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static java.lang.Math.min;
/**
* {@link EventLoop} which uses epoll under the covers. Only works on Linux!
*/
@ -55,6 +57,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
private final FileDescriptor epollFd;
private final FileDescriptor eventFd;
private final FileDescriptor timerFd;
private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
private final boolean allowGrowing;
private final EpollEventArray events;
@ -63,7 +66,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return Native.epollWait(epollFd.intValue(), events, 0);
return epollWaitNow();
}
};
private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
@ -89,6 +92,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
boolean success = false;
FileDescriptor epollFd = null;
FileDescriptor eventFd = null;
FileDescriptor timerFd = null;
try {
this.epollFd = epollFd = Native.newEpollCreate();
this.eventFd = eventFd = Native.newEventFd();
@ -97,6 +101,12 @@ final class EpollEventLoop extends SingleThreadEventLoop {
} catch (IOException e) {
throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
}
this.timerFd = timerFd = Native.newTimerFd();
try {
Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
} catch (IOException e) {
throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
}
success = true;
} finally {
if (!success) {
@ -114,6 +124,13 @@ final class EpollEventLoop extends SingleThreadEventLoop {
// ignore
}
}
if (timerFd != null) {
try {
timerFd.close();
} catch (Exception e) {
// ignore
}
}
}
}
}
@ -204,43 +221,23 @@ final class EpollEventLoop extends SingleThreadEventLoop {
this.ioRatio = ioRatio;
}
private int epollWait(boolean oldWakenUp) throws IOException {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
int ready = Native.epollWait(epollFd.intValue(), events, 0);
if (ready > 0) {
return ready;
}
}
break;
}
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
if (hasTasks() && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
return Native.epollWait(epollFd.intValue(), events, 0);
}
int selectedKeys = Native.epollWait(epollFd.intValue(), events, (int) timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp == 1 || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
return selectedKeys;
}
currentTimeNanos = System.nanoTime();
private int epollWait(boolean oldWakeup) throws IOException {
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
if (oldWakeup && hasTasks()) {
return epollWaitNow();
}
return 0;
long totalDelay = delayNanos(System.nanoTime());
int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
return Native.epollWait(epollFd, events, timerFd, delaySeconds,
(int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
}
private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, timerFd, 0, 0);
}
@Override
@ -347,7 +344,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
private void closeAll() {
try {
Native.epollWait(epollFd.intValue(), events, 0);
epollWaitNow();
} catch (IOException ignore) {
// ignore on close
}
@ -368,8 +365,11 @@ final class EpollEventLoop extends SingleThreadEventLoop {
for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i);
if (fd == eventFd.intValue()) {
// consume wakeup event
Native.eventFdRead(eventFd.intValue());
// consume wakeup event.
Native.eventFdRead(fd);
} else if (fd == timerFd.intValue()) {
// consume wakeup event, necessary because the timer is added with ET mode.
Native.timerFdRead(fd);
} else {
final long ev = events.events(i);
@ -438,6 +438,11 @@ final class EpollEventLoop extends SingleThreadEventLoop {
} catch (IOException e) {
logger.warn("Failed to close the event fd.", e);
}
try {
timerFd.close();
} catch (IOException e) {
logger.warn("Failed to close the timer fd.", e);
}
} finally {
// release native memory
iovArray.release();

View File

@ -97,9 +97,15 @@ public final class Native {
return new FileDescriptor(eventFd());
}
public static FileDescriptor newTimerFd() {
return new FileDescriptor(timerFd());
}
private static native int eventFd();
private static native int timerFd();
public static native void eventFdWrite(int fd, long value);
public static native void eventFdRead(int fd);
static native void timerFdRead(int fd);
public static FileDescriptor newEpollCreate() {
return new FileDescriptor(epollCreate());
@ -107,14 +113,16 @@ public final class Native {
private static native int epollCreate();
public static int epollWait(int efd, EpollEventArray events, int timeout) throws IOException {
int ready = epollWait0(efd, events.memoryAddress(), events.length(), timeout);
public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd,
int timeoutSec, int timeoutNs) throws IOException {
int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
timeoutSec, timeoutNs);
if (ready < 0) {
throw newIOException("epoll_wait", ready);
}
return ready;
}
private static native int epollWait0(int efd, long address, int len, int timeout);
private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs);
public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException {
int res = epollCtlAdd0(efd, fd, flags);

View File

@ -109,11 +109,10 @@ static jint netty_kqueue_native_keventWait(JNIEnv* env, jclass clazz, jint kqueu
timeoutTs.tv_sec = tvSec;
timeoutTs.tv_nsec = tvNsec;
// Negatives = wait indefinitely, Zeros = poll (aka return immediately).
if ((tvSec == 0 && tvNsec == 0) || tvSec < 0 || tvNsec < 0) {
const struct timespec* fixedTs = (tvSec == 0 && tvNsec == 0) ? &timeoutTs : NULL;
if (tvSec == 0 && tvNsec == 0) {
// Zeros = poll (aka return immediately).
for (;;) {
result = kevent(kqueueFd, changeList, changeListLength, eventList, eventListLength, fixedTs);
result = kevent(kqueueFd, changeList, changeListLength, eventList, eventListLength, &timeoutTs);
if (result >= 0) {
return result;
}

View File

@ -133,12 +133,9 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
}
private int kqueueWait(boolean oldWakeup) throws IOException {
// TODO(scott): do we need to loop here ... we already loop in keventWait to ensure we wait the expected time.
// We also do the same thing in EPOLL ... do we need to loop there?
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
// until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
if (oldWakeup && hasTasks()) {
return kqueueWaitNow();