diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index a2f1a31505..ee9a0cd0f0 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -197,8 +197,9 @@ static void netty_epoll_native_timerFdSetTime(JNIEnv* env, jclass clazz, jint ti } } -static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) { +static jint netty_epoll_native_epollWaitNoTimeout(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jboolean immediatePoll) { struct epoll_event *ev = (struct epoll_event*) (intptr_t) address; + const int timeout = immediatePoll ? 0 : -1; int result, err; do { @@ -523,7 +524,7 @@ static const JNINativeMethod fixed_method_table[] = { { "timerFdSetTime", "(III)V", (void *) netty_epoll_native_timerFdSetTime }, { "epollCreate", "()I", (void *) netty_epoll_native_epollCreate }, { "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, // This method is deprecated! - { "epollWait", "(IJII)I", (void *) netty_epoll_native_epollWait }, + { "epollWaitNoTimeout", "(IJIZ)I", (void *) netty_epoll_native_epollWaitNoTimeout }, { "epollBusyWait0", "(IJI)I", (void *) netty_epoll_native_epollBusyWait0 }, { "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 }, { "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 }, diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 4d2e5057c7..187265d4f7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -63,8 +63,7 @@ class EpollEventLoop extends SingleThreadEventLoop { * with the time source (e.g. calling System.nanoTime()) which can be expensive. */ private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L); - private final AtomicInteger wakenUp = new AtomicInteger(1); - private boolean pendingWakeup; + private final AtomicInteger wakenUp = new AtomicInteger(); private final FileDescriptor epollFd; private final FileDescriptor eventFd; private final FileDescriptor timerFd; @@ -355,18 +354,17 @@ class EpollEventLoop extends SingleThreadEventLoop { } private int epollWait() throws IOException { - return Native.epollWait(epollFd, events, false); + // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event. + // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended + // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed + // in pipeline. + return Native.epollWait(epollFd, events, hasTasks()); } private int epollWaitNow() throws IOException { return Native.epollWait(epollFd, events, true); } - private int epollWaitTimeboxed() throws IOException { - // Wait with 1 second "safeguard" timeout - return Native.epollWait(epollFd, events, 1000); - } - private int epollBusyWait() throws IOException { return Native.epollBusyWait(epollFd, events); } @@ -387,39 +385,17 @@ class EpollEventLoop extends SingleThreadEventLoop { break; case SelectStrategy.SELECT: - if (pendingWakeup) { - // We are going to be immediately woken so no need to reset wakenUp - // or check for timerfd adjustment. - strategy = epollWaitTimeboxed(); - if (strategy != 0) { - break; - } - // We timed out so assume that we missed the write event due to an - // abnormally failed syscall (the write itself or a prior epoll_wait) - pendingWakeup = false; - if (hasTasks()) { - break; - } - // fall-through + if (wakenUp.get() == 1) { + wakenUp.set(0); } - // Ordered store is sufficient here since the only access outside this - // thread is a getAndSet in the wakeup() method - wakenUp.lazySet(0); - try { + if (!hasTasks()) { // When we are in the EventLoop we don't bother setting the timerFd for each // scheduled task, but instead defer the processing until the end of the EventLoop // (next wait) to reduce the timerFd modifications. timerFdDeadline = checkScheduleTaskQueueForNewDelay(timerFdDeadline); - if (!hasTasks()) { + try { strategy = epollWait(); - } - } finally { - // Try get() first to avoid much more expensive CAS in the case we - // were woken via the wakeup() method (submitted task) - if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) { - pendingWakeup = true; - } - if (timerFdDeadline >= 0) { + } finally { // This getAndAdd will change the raw value of nextDeadlineNanos to be negative // which will block any *new* timerFd mods by other threads while also "preserving" // its last value to avoid disrupting a possibly-concurrent setTimerFd call @@ -479,6 +455,12 @@ class EpollEventLoop extends SingleThreadEventLoop { } private void closeAll() { + try { + epollWaitNow(); + } catch (IOException ignore) { + // ignore on close + } + // Using the intermediate collection to prevent ConcurrentModificationException. // In the `close()` method, the channel is deleted from `channels` map. AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]); @@ -494,7 +476,9 @@ class EpollEventLoop extends SingleThreadEventLoop { for (int i = 0; i < ready; ++i) { final int fd = events.fd(i); if (fd == eventFd.intValue()) { - pendingWakeup = false; + // Just ignore as we use ET mode for the eventfd and timerfd. + // + // See also https://stackoverflow.com/a/12492308/1074097 } else if (fd == timerFd.intValue()) { timerFired = true; } else { @@ -556,30 +540,16 @@ class EpollEventLoop extends SingleThreadEventLoop { @Override protected void cleanup() { try { - try { - // Ensure any in-flight wakeup writes have been performed prior to closing eventFd. - while (pendingWakeup) { - int count = epollWaitTimeboxed(); - if (count == 0) { - // We timed-out so assume that the write we're expecting isn't coming - break; - } - for (int i = 0; i < count; i++) { - if (events.fd(i) == eventFd.intValue()) { - pendingWakeup = false; - break; - } - } - } - eventFd.close(); - } catch (IOException e) { - logger.warn("Failed to close the event fd.", e); - } try { epollFd.close(); } catch (IOException e) { logger.warn("Failed to close the epoll fd.", e); } + try { + eventFd.close(); + } catch (IOException e) { + logger.warn("Failed to close the event fd.", e); + } try { timerFd.close(); } catch (IOException e) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index db7bf8d3c2..40af0e742a 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -21,7 +21,6 @@ import io.netty.util.internal.NativeLibraryLoader; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.ThrowableUtil; -import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -45,7 +44,6 @@ import static io.netty.channel.unix.Errors.newIOException; *
Internal usage only! *
Static members which call JNI methods must be defined in {@link NativeStaticallyReferencedJniMethods}. */ -@UnstableApi public final class Native { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class); @@ -109,11 +107,7 @@ public final class Native { } static int epollWait(FileDescriptor epollFd, EpollEventArray events, boolean immediatePoll) throws IOException { - return epollWait(epollFd, events, immediatePoll ? 0 : -1); - } - - static int epollWait(FileDescriptor epollFd, EpollEventArray events, int timeout) throws IOException { - int ready = epollWait(epollFd.intValue(), events.memoryAddress(), events.length(), timeout); + int ready = epollWaitNoTimeout(epollFd.intValue(), events.memoryAddress(), events.length(), immediatePoll); if (ready < 0) { throw newIOException("epoll_wait", ready); } @@ -134,7 +128,7 @@ public final class Native { } private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs); - private static native int epollWait(int efd, long address, int len, int timeout); + private static native int epollWaitNoTimeout(int efd, long address, int len, boolean immediatePoll); private static native int epollBusyWait0(int efd, long address, int len); public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException {