diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 2f26e0731f..f01bbf3fae 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -69,7 +69,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann private volatile SocketAddress local; private volatile SocketAddress remote; - protected int flags = Native.EPOLLET; + protected int flags = Native.EPOLLET | Native.EPOLLIN; + protected int activeFlags; boolean inputClosedSeenErrorOnRead; boolean epollInReadyRunnablePending; @@ -109,17 +110,23 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } } - void setFlag(int flag) throws IOException { + void setFlag(int flag) { if (!isFlagSet(flag)) { flags |= flag; - modifyEvents(); + updatePendingFlagsSet(); } } - void clearFlag(int flag) throws IOException { + void clearFlag(int flag) { if (isFlagSet(flag)) { flags &= ~flag; - modifyEvents(); + updatePendingFlagsSet(); + } + } + + private void updatePendingFlagsSet() { + if (isRegistered()) { + ((EpollEventLoop) eventLoop()).updatePendingFlagsSet(this); } } @@ -241,33 +248,33 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann ((SocketChannelConfig) config).isAllowHalfClosure(); } + private Runnable clearEpollInTask; + final void clearEpollIn() { // Only clear if registered with an EventLoop as otherwise - if (isRegistered()) { - final EventLoop loop = eventLoop(); - final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); - if (loop.inEventLoop()) { - unsafe.clearEpollIn0(); - } else { - // schedule a task to clear the EPOLLIN as it is not safe to modify it directly - loop.execute(new Runnable() { - @Override - public void run() { - if (!unsafe.readPending && !config().isAutoRead()) { - // Still no read triggered so clear it now - unsafe.clearEpollIn0(); - } - } - }); - } - } else { - // The EventLoop is not registered atm so just update the flags so the correct value - // will be used once the channel is registered - flags &= ~Native.EPOLLIN; + final EventLoop loop = isRegistered() ? eventLoop() : null; + final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); + if (loop == null || loop.inEventLoop()) { + unsafe.clearEpollIn0(); + return; } + // schedule a task to clear the EPOLLIN as it is not safe to modify it directly + Runnable clearFlagTask = clearEpollInTask; + if (clearFlagTask == null) { + clearEpollInTask = clearFlagTask = new Runnable() { + @Override + public void run() { + if (!unsafe.readPending && !config().isAutoRead()) { + // Still no read triggered so clear it now + unsafe.clearEpollIn0(); + } + } + }; + } + loop.execute(clearFlagTask); } - private void modifyEvents() throws IOException { + void modifyEvents() throws IOException { if (isOpen() && isRegistered()) { ((EpollEventLoop) eventLoop()).modify(this); } @@ -411,7 +418,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 - clearEpollIn(); + clearEpollIn0(); } } @@ -441,19 +448,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event. - clearEpollRdHup(); - } - - /** - * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure. - */ - private void clearEpollRdHup() { - try { - clearFlag(Native.EPOLLRDHUP); - } catch (IOException e) { - pipeline().fireExceptionCaught(e); - close(voidPromise()); - } + clearFlag(Native.EPOLLRDHUP); } /** @@ -473,7 +468,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // We attempted to shutdown and failed, which means the input has already effectively been // shutdown. } - clearEpollIn(); + clearEpollIn0(); pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { close(voidPromise()); @@ -529,16 +524,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } protected final void clearEpollIn0() { - assert eventLoop().inEventLoop(); - try { - readPending = false; - clearFlag(Native.EPOLLIN); - } catch (IOException e) { - // When this happens there is something completely wrong with either the filedescriptor or epoll, - // so fire the exception through the pipeline and close the Channel. - pipeline().fireExceptionCaught(e); - unsafe().close(unsafe().voidPromise()); - } + assert !isRegistered() || eventLoop().inEventLoop(); + readPending = false; + clearFlag(Native.EPOLLIN); } @Override @@ -663,7 +651,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann /** * Finish the connect */ - private boolean doFinishConnect() throws Exception { + private boolean doFinishConnect() throws IOException { if (socket.finishConnect()) { clearFlag(Native.EPOLLOUT); if (requestedRemoteAddress instanceof InetSocketAddress) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java index 2d2610c0e2..8a946921be 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java @@ -150,8 +150,7 @@ public class EpollChannelConfig extends DefaultChannelConfig { if (mode == null) { throw new NullPointerException("mode"); } - try { - switch (mode) { + switch (mode) { case EDGE_TRIGGERED: checkChannelNotRegistered(); ((AbstractEpollChannel) channel).setFlag(Native.EPOLLET); @@ -162,9 +161,6 @@ public class EpollChannelConfig extends DefaultChannelConfig { break; default: throw new Error(); - } - } catch (IOException e) { - throw new ChannelException(e); } return this; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index e6ac6c66e3..98b69bbb4d 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -33,6 +33,7 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; +import java.util.BitSet; import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; @@ -67,6 +68,8 @@ class EpollEventLoop extends SingleThreadEventLoop { private final FileDescriptor eventFd; private final FileDescriptor timerFd; private final IntObjectMap channels = new IntObjectHashMap(4096); + private final BitSet pendingFlagChannels = new BitSet(); + private final boolean allowGrowing; private final EpollEventArray events; @@ -295,6 +298,7 @@ class EpollEventLoop extends SingleThreadEventLoop { assert inEventLoop(); int fd = ch.socket.intValue(); Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags); + ch.activeFlags = ch.flags; AbstractEpollChannel old = channels.put(fd, ch); // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already @@ -308,6 +312,28 @@ class EpollEventLoop extends SingleThreadEventLoop { void modify(AbstractEpollChannel ch) throws IOException { assert inEventLoop(); Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags); + ch.activeFlags = ch.flags; + } + + void updatePendingFlagsSet(AbstractEpollChannel ch) { + pendingFlagChannels.set(ch.socket.intValue(), ch.flags != ch.activeFlags); + } + + private void processPendingChannelFlags() { + // Call epollCtlMod for any channels that require event interest changes before epollWaiting + if (!pendingFlagChannels.isEmpty()) { + for (int fd = 0; (fd = pendingFlagChannels.nextSetBit(fd)) >= 0; pendingFlagChannels.clear(fd)) { + AbstractEpollChannel ch = channels.get(fd); + if (ch != null) { + try { + ch.modifyEvents(); + } catch (IOException e) { + ch.pipeline().fireExceptionCaught(e); + ch.close(); + } + } + } + } } /** @@ -324,10 +350,14 @@ class EpollEventLoop extends SingleThreadEventLoop { // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed. assert !ch.isOpen(); - } else if (ch.isOpen()) { - // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically - // removed once the file-descriptor is closed. - Native.epollCtlDel(epollFd.intValue(), fd); + } else { + ch.activeFlags = 0; + pendingFlagChannels.clear(fd); + if (ch.isOpen()) { + // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically + // removed once the file-descriptor is closed. + Native.epollCtlDel(epollFd.intValue(), fd); + } } } @@ -367,6 +397,7 @@ class EpollEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { try { + processPendingChannelFlags(); int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: