Revert "Close eventfd shutdown/wakeup race by closely tracking epoll edges (#9535)"

This reverts commit 2123fbe495cc86b5a8203a0452ecdd0e5ec8d7c3.
This commit is contained in:
Norman Maurer 2019-09-12 11:41:53 +02:00
parent aef47bec7f
commit 8280252d0e
3 changed files with 30 additions and 65 deletions

View File

@ -197,8 +197,9 @@ static void netty_epoll_native_timerFdSetTime(JNIEnv* env, jclass clazz, jint ti
} }
} }
static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) { static jint netty_epoll_native_epollWaitNoTimeout(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jboolean immediatePoll) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address; struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
const int timeout = immediatePoll ? 0 : -1;
int result, err; int result, err;
do { do {
@ -523,7 +524,7 @@ static const JNINativeMethod fixed_method_table[] = {
{ "timerFdSetTime", "(III)V", (void *) netty_epoll_native_timerFdSetTime }, { "timerFdSetTime", "(III)V", (void *) netty_epoll_native_timerFdSetTime },
{ "epollCreate", "()I", (void *) netty_epoll_native_epollCreate }, { "epollCreate", "()I", (void *) netty_epoll_native_epollCreate },
{ "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, // This method is deprecated! { "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, // This method is deprecated!
{ "epollWait", "(IJII)I", (void *) netty_epoll_native_epollWait }, { "epollWaitNoTimeout", "(IJIZ)I", (void *) netty_epoll_native_epollWaitNoTimeout },
{ "epollBusyWait0", "(IJI)I", (void *) netty_epoll_native_epollBusyWait0 }, { "epollBusyWait0", "(IJI)I", (void *) netty_epoll_native_epollBusyWait0 },
{ "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 }, { "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 },
{ "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 }, { "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 },

View File

@ -63,8 +63,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
* with the time source (e.g. calling System.nanoTime()) which can be expensive. * with the time source (e.g. calling System.nanoTime()) which can be expensive.
*/ */
private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L); private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L);
private final AtomicInteger wakenUp = new AtomicInteger(1); private final AtomicInteger wakenUp = new AtomicInteger();
private boolean pendingWakeup;
private final FileDescriptor epollFd; private final FileDescriptor epollFd;
private final FileDescriptor eventFd; private final FileDescriptor eventFd;
private final FileDescriptor timerFd; private final FileDescriptor timerFd;
@ -355,18 +354,17 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
private int epollWait() throws IOException { private int epollWait() throws IOException {
return Native.epollWait(epollFd, events, false); // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
return Native.epollWait(epollFd, events, hasTasks());
} }
private int epollWaitNow() throws IOException { private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, true); return Native.epollWait(epollFd, events, true);
} }
private int epollWaitTimeboxed() throws IOException {
// Wait with 1 second "safeguard" timeout
return Native.epollWait(epollFd, events, 1000);
}
private int epollBusyWait() throws IOException { private int epollBusyWait() throws IOException {
return Native.epollBusyWait(epollFd, events); return Native.epollBusyWait(epollFd, events);
} }
@ -387,39 +385,17 @@ class EpollEventLoop extends SingleThreadEventLoop {
break; break;
case SelectStrategy.SELECT: case SelectStrategy.SELECT:
if (pendingWakeup) { if (wakenUp.get() == 1) {
// We are going to be immediately woken so no need to reset wakenUp wakenUp.set(0);
// or check for timerfd adjustment.
strategy = epollWaitTimeboxed();
if (strategy != 0) {
break;
}
// We timed out so assume that we missed the write event due to an
// abnormally failed syscall (the write itself or a prior epoll_wait)
pendingWakeup = false;
if (hasTasks()) {
break;
}
// fall-through
} }
// Ordered store is sufficient here since the only access outside this if (!hasTasks()) {
// thread is a getAndSet in the wakeup() method
wakenUp.lazySet(0);
try {
// When we are in the EventLoop we don't bother setting the timerFd for each // When we are in the EventLoop we don't bother setting the timerFd for each
// scheduled task, but instead defer the processing until the end of the EventLoop // scheduled task, but instead defer the processing until the end of the EventLoop
// (next wait) to reduce the timerFd modifications. // (next wait) to reduce the timerFd modifications.
timerFdDeadline = checkScheduleTaskQueueForNewDelay(timerFdDeadline); timerFdDeadline = checkScheduleTaskQueueForNewDelay(timerFdDeadline);
if (!hasTasks()) { try {
strategy = epollWait(); strategy = epollWait();
} } finally {
} finally {
// Try get() first to avoid much more expensive CAS in the case we
// were woken via the wakeup() method (submitted task)
if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) {
pendingWakeup = true;
}
if (timerFdDeadline >= 0) {
// This getAndAdd will change the raw value of nextDeadlineNanos to be negative // This getAndAdd will change the raw value of nextDeadlineNanos to be negative
// which will block any *new* timerFd mods by other threads while also "preserving" // which will block any *new* timerFd mods by other threads while also "preserving"
// its last value to avoid disrupting a possibly-concurrent setTimerFd call // its last value to avoid disrupting a possibly-concurrent setTimerFd call
@ -479,6 +455,12 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
private void closeAll() { private void closeAll() {
try {
epollWaitNow();
} catch (IOException ignore) {
// ignore on close
}
// Using the intermediate collection to prevent ConcurrentModificationException. // Using the intermediate collection to prevent ConcurrentModificationException.
// In the `close()` method, the channel is deleted from `channels` map. // In the `close()` method, the channel is deleted from `channels` map.
AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]); AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
@ -494,7 +476,9 @@ class EpollEventLoop extends SingleThreadEventLoop {
for (int i = 0; i < ready; ++i) { for (int i = 0; i < ready; ++i) {
final int fd = events.fd(i); final int fd = events.fd(i);
if (fd == eventFd.intValue()) { if (fd == eventFd.intValue()) {
pendingWakeup = false; // Just ignore as we use ET mode for the eventfd and timerfd.
//
// See also https://stackoverflow.com/a/12492308/1074097
} else if (fd == timerFd.intValue()) { } else if (fd == timerFd.intValue()) {
timerFired = true; timerFired = true;
} else { } else {
@ -556,30 +540,16 @@ class EpollEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void cleanup() { protected void cleanup() {
try { try {
try {
// Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
while (pendingWakeup) {
int count = epollWaitTimeboxed();
if (count == 0) {
// We timed-out so assume that the write we're expecting isn't coming
break;
}
for (int i = 0; i < count; i++) {
if (events.fd(i) == eventFd.intValue()) {
pendingWakeup = false;
break;
}
}
}
eventFd.close();
} catch (IOException e) {
logger.warn("Failed to close the event fd.", e);
}
try { try {
epollFd.close(); epollFd.close();
} catch (IOException e) { } catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e); logger.warn("Failed to close the epoll fd.", e);
} }
try {
eventFd.close();
} catch (IOException e) {
logger.warn("Failed to close the event fd.", e);
}
try { try {
timerFd.close(); timerFd.close();
} catch (IOException e) { } catch (IOException e) {

View File

@ -21,7 +21,6 @@ import io.netty.util.internal.NativeLibraryLoader;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -45,7 +44,6 @@ import static io.netty.channel.unix.Errors.newIOException;
* <p><strong>Internal usage only!</strong> * <p><strong>Internal usage only!</strong>
* <p>Static members which call JNI methods must be defined in {@link NativeStaticallyReferencedJniMethods}. * <p>Static members which call JNI methods must be defined in {@link NativeStaticallyReferencedJniMethods}.
*/ */
@UnstableApi
public final class Native { public final class Native {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class);
@ -109,11 +107,7 @@ public final class Native {
} }
static int epollWait(FileDescriptor epollFd, EpollEventArray events, boolean immediatePoll) throws IOException { static int epollWait(FileDescriptor epollFd, EpollEventArray events, boolean immediatePoll) throws IOException {
return epollWait(epollFd, events, immediatePoll ? 0 : -1); int ready = epollWaitNoTimeout(epollFd.intValue(), events.memoryAddress(), events.length(), immediatePoll);
}
static int epollWait(FileDescriptor epollFd, EpollEventArray events, int timeout) throws IOException {
int ready = epollWait(epollFd.intValue(), events.memoryAddress(), events.length(), timeout);
if (ready < 0) { if (ready < 0) {
throw newIOException("epoll_wait", ready); throw newIOException("epoll_wait", ready);
} }
@ -134,7 +128,7 @@ public final class Native {
} }
private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs); private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs);
private static native int epollWait(int efd, long address, int len, int timeout); private static native int epollWaitNoTimeout(int efd, long address, int len, boolean immediatePoll);
private static native int epollBusyWait0(int efd, long address, int len); private static native int epollBusyWait0(int efd, long address, int len);
public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException { public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException {