Close eventfd shutdown/wakeup race by closely tracking epoll edges (#9586)

Motivation

This is another iteration of #9476.

Modifications

Instead of maintaining a count of all writes performed and then using
reads during shutdown to ensure all are accounted for, just set a flag
after each write and don't reset it until the corresponding event has
been returned from epoll_wait.

This requires that while a write is still pending we don't reset
wakenUp, i.e. continue to block writes from the wakeup() method.

Result

Race condition eliminated. Fixes #9362

Co-authored-by: Norman Maurer <norman_maurer@apple.com>
This commit is contained in:
Norman Maurer 2019-09-23 15:30:42 +02:00 committed by GitHub
parent 0a2d85f1d3
commit 76592db0bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 59 deletions

View File

@ -197,9 +197,8 @@ static void netty_epoll_native_timerFdSetTime(JNIEnv* env, jclass clazz, jint ti
}
}
static jint netty_epoll_native_epollWaitNoTimeout(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jboolean immediatePoll) {
static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
const int timeout = immediatePoll ? 0 : -1;
int result, err;
do {
@ -213,47 +212,23 @@ static jint netty_epoll_native_epollWaitNoTimeout(JNIEnv* env, jclass clazz, jin
// This method is deprecated!
static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
int result, err;
if (tvSec == 0 && tvNsec == 0) {
// Zeros = poll (aka return immediately).
do {
result = epoll_wait(efd, ev, len, 0);
if (result >= 0) {
return result;
}
} while((err = errno) == EINTR);
} else {
// only reschedule the timer if there is a newer event.
// -1 is a special value used by EpollEventLoop.
if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
ts.it_value.tv_nsec = tvNsec;
if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_settime() failed: ", errno);
return -1;
}
}
do {
result = epoll_wait(efd, ev, len, -1);
if (result > 0) {
// Detect timeout, and preserve the epoll_wait API.
if (result == 1 && ev[0].data.fd == timerFd) {
// We assume that timerFD is in ET mode. So we must consume this event to ensure we are notified
// of future timer events because ET mode only notifies a single time until the event is consumed.
uint64_t timerFireCount;
// We don't care what the result is. We just want to consume the wakeup event and reset ET.
result = read(timerFd, &timerFireCount, sizeof(uint64_t));
return 0;
}
return result;
}
} while((err = errno) == EINTR);
return netty_epoll_native_epollWait(env, clazz, efd, address, len, 0);
}
return -err;
// only reschedule the timer if there is a newer event.
// -1 is a special value used by EpollEventLoop.
if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
ts.it_value.tv_nsec = tvNsec;
if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_settime() failed: ", errno);
return -1;
}
}
return netty_epoll_native_epollWait(env, clazz, efd, address, len, -1);
}
static inline void cpu_relax() {
@ -524,7 +499,7 @@ static const JNINativeMethod fixed_method_table[] = {
{ "timerFdSetTime", "(III)V", (void *) netty_epoll_native_timerFdSetTime },
{ "epollCreate", "()I", (void *) netty_epoll_native_epollCreate },
{ "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, // This method is deprecated!
{ "epollWaitNoTimeout", "(IJIZ)I", (void *) netty_epoll_native_epollWaitNoTimeout },
{ "epollWait", "(IJII)I", (void *) netty_epoll_native_epollWait },
{ "epollBusyWait0", "(IJI)I", (void *) netty_epoll_native_epollBusyWait0 },
{ "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 },
{ "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 },

View File

@ -36,7 +36,7 @@ import java.io.IOException;
import java.util.BitSet;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.Math.min;
@ -45,8 +45,6 @@ import static java.lang.Math.min;
*/
class EpollEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");
static {
// Ensure JNI is initialized by the time this class is loaded by this time!
@ -76,8 +74,8 @@ class EpollEventLoop extends SingleThreadEventLoop {
return epollWaitNow();
}
};
@SuppressWarnings("unused") // AtomicIntegerFieldUpdater
private volatile int wakenUp;
private final AtomicInteger wakenUp = new AtomicInteger(1);
private boolean pendingWakeup;
private volatile int ioRatio = 50;
// See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
@ -180,7 +178,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && WAKEN_UP_UPDATER.getAndSet(this, 1) == 0) {
if (!inEventLoop && wakenUp.getAndSet(1) == 0) {
// write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd.intValue(), 1L);
}
@ -307,13 +305,18 @@ class EpollEventLoop extends SingleThreadEventLoop {
}
private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, timerFd, 0, 0);
return Native.epollWait(epollFd, events, true);
}
private int epollBusyWait() throws IOException {
return Native.epollBusyWait(epollFd, events);
}
private int epollWaitTimeboxed() throws IOException {
// Wait with 1 second "safeguard" timeout
return Native.epollWait(epollFd, events, 1000);
}
@Override
protected void run() {
for (;;) {
@ -329,11 +332,34 @@ class EpollEventLoop extends SingleThreadEventLoop {
break;
case SelectStrategy.SELECT:
if (wakenUp == 1) {
wakenUp = 0;
if (pendingWakeup) {
// We are going to be immediately woken so no need to reset wakenUp
// or check for timerfd adjustment.
strategy = epollWaitTimeboxed();
if (strategy != 0) {
break;
}
// We timed out so assume that we missed the write event due to an
// abnormally failed syscall (the write itself or a prior epoll_wait)
logger.warn("Missed eventfd write (not seen after > 1 second)");
pendingWakeup = false;
if (hasTasks()) {
break;
}
// fall-through
}
if (!hasTasks()) {
strategy = epollWait();
wakenUp.set(0);
try {
if (!hasTasks()) {
strategy = epollWait();
}
} finally {
// Try get() first to avoid much more expensive CAS in the case we
// were woken via the wakeup() method (submitted task)
if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) {
pendingWakeup = true;
}
}
// fallthrough
default:
@ -417,7 +443,9 @@ class EpollEventLoop extends SingleThreadEventLoop {
private void processReady(EpollEventArray events, int ready) {
for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i);
if (fd == eventFd.intValue() || fd == timerFd.intValue()) {
if (fd == eventFd.intValue()) {
pendingWakeup = false;
} else if (fd == timerFd.intValue()) {
// Just ignore as we use ET mode for the eventfd and timerfd.
//
// See also https://stackoverflow.com/a/12492308/1074097
@ -479,10 +507,23 @@ class EpollEventLoop extends SingleThreadEventLoop {
@Override
protected void cleanup() {
try {
try {
epollFd.close();
} catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e);
// Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
while (pendingWakeup) {
try {
int count = epollWaitTimeboxed();
if (count == 0) {
// We timed-out so assume that the write we're expecting isn't coming
break;
}
for (int i = 0; i < count; i++) {
if (events.fd(i) == eventFd.intValue()) {
pendingWakeup = false;
break;
}
}
} catch (IOException ignore) {
// ignore
}
}
try {
eventFd.close();
@ -494,6 +535,12 @@ class EpollEventLoop extends SingleThreadEventLoop {
} catch (IOException e) {
logger.warn("Failed to close the timer fd.", e);
}
try {
epollFd.close();
} catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e);
}
} finally {
// release native memory
if (iovArray != null) {

View File

@ -107,7 +107,14 @@ public final class Native {
}
static int epollWait(FileDescriptor epollFd, EpollEventArray events, boolean immediatePoll) throws IOException {
int ready = epollWaitNoTimeout(epollFd.intValue(), events.memoryAddress(), events.length(), immediatePoll);
return epollWait(epollFd, events, immediatePoll ? 0 : -1);
}
/**
* This uses epoll's own timeout and does not reset/re-arm any timerfd
*/
static int epollWait(FileDescriptor epollFd, EpollEventArray events, int timeoutMillis) throws IOException {
int ready = epollWait(epollFd.intValue(), events.memoryAddress(), events.length(), timeoutMillis);
if (ready < 0) {
throw newIOException("epoll_wait", ready);
}
@ -128,7 +135,7 @@ public final class Native {
}
private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs);
private static native int epollWaitNoTimeout(int efd, long address, int len, boolean immediatePoll);
private static native int epollWait(int efd, long address, int len, int timeout);
private static native int epollBusyWait0(int efd, long address, int len);
public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException {