From 2b9f69ac3833b5409b7115b999967cdf3e4e4968 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 20 Sep 2019 07:49:37 +0200 Subject: [PATCH] Epoll: Avoid redundant EPOLL_CTL_MOD calls (#9397) (#9583) Motivation Currently an epoll_ctl syscall is made every time there is a change to the event interest flags (EPOLLIN, EPOLLOUT, etc) of a channel. These are only done in the event loop so can be aggregated into 0 or 1 such calls per channel prior to the next call to epoll_wait. Modifications I think further streamlining/simplification is possible but for now I've tried to minimize structural changes and added the aggregation beneath the existing flag manipulation logic. A new AbstractChannel#activeFlags field records the flags last set on the epoll fd for that channel. Calls to setFlag/clearFlag update the flags field as before but instead of calling epoll_ctl immediately, just set or clear a bit for the channel in a new bitset in the associated EpollEventLoop to reflect whether there's any change to the last set value. Prior to calling epoll_wait the event loop makes the appropriate epoll_ctl(EPOLL_CTL_MOD) call once for each channel who's bit is set. Result Fewer syscalls, particularly in some auto-read=false cases. Simplified error handling from centralization of these calls. --- .../channel/epoll/AbstractEpollChannel.java | 94 ++++++++----------- .../channel/epoll/EpollChannelConfig.java | 6 +- .../netty/channel/epoll/EpollEventLoop.java | 39 +++++++- 3 files changed, 77 insertions(+), 62 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 41354197ce..a8e7731c69 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -69,7 +69,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann private volatile SocketAddress local; private volatile SocketAddress remote; - protected int flags = Native.EPOLLET; + protected int flags = Native.EPOLLET | Native.EPOLLIN; + protected int activeFlags; boolean inputClosedSeenErrorOnRead; boolean epollInReadyRunnablePending; @@ -109,17 +110,23 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } } - void setFlag(int flag) throws IOException { + void setFlag(int flag) { if (!isFlagSet(flag)) { flags |= flag; - modifyEvents(); + updatePendingFlagsSet(); } } - void clearFlag(int flag) throws IOException { + void clearFlag(int flag) { if (isFlagSet(flag)) { flags &= ~flag; - modifyEvents(); + updatePendingFlagsSet(); + } + } + + private void updatePendingFlagsSet() { + if (isRegistered()) { + ((EpollEventLoop) eventLoop()).updatePendingFlagsSet(this); } } @@ -246,33 +253,33 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann ((SocketChannelConfig) config).isAllowHalfClosure(); } + private Runnable clearEpollInTask; + final void clearEpollIn() { // Only clear if registered with an EventLoop as otherwise - if (isRegistered()) { - final EventLoop loop = eventLoop(); - final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); - if (loop.inEventLoop()) { - unsafe.clearEpollIn0(); - } else { - // schedule a task to clear the EPOLLIN as it is not safe to modify it directly - loop.execute(new Runnable() { - @Override - public void run() { - if (!unsafe.readPending && !config().isAutoRead()) { - // Still no read triggered so clear it now - unsafe.clearEpollIn0(); - } - } - }); - } - } else { - // The EventLoop is not registered atm so just update the flags so the correct value - // will be used once the channel is registered - flags &= ~Native.EPOLLIN; + final EventLoop loop = isRegistered() ? eventLoop() : null; + final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); + if (loop == null || loop.inEventLoop()) { + unsafe.clearEpollIn0(); + return; } + // schedule a task to clear the EPOLLIN as it is not safe to modify it directly + Runnable clearFlagTask = clearEpollInTask; + if (clearFlagTask == null) { + clearEpollInTask = clearFlagTask = new Runnable() { + @Override + public void run() { + if (!unsafe.readPending && !config().isAutoRead()) { + // Still no read triggered so clear it now + unsafe.clearEpollIn0(); + } + } + }; + } + loop.execute(clearFlagTask); } - private void modifyEvents() throws IOException { + void modifyEvents() throws IOException { if (isOpen() && isRegistered()) { ((EpollEventLoop) eventLoop()).modify(this); } @@ -416,7 +423,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 - clearEpollIn(); + clearEpollIn0(); } } @@ -446,19 +453,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event. - clearEpollRdHup(); - } - - /** - * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure. - */ - private void clearEpollRdHup() { - try { - clearFlag(Native.EPOLLRDHUP); - } catch (IOException e) { - pipeline().fireExceptionCaught(e); - close(voidPromise()); - } + clearFlag(Native.EPOLLRDHUP); } /** @@ -478,7 +473,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // We attempted to shutdown and failed, which means the input has already effectively been // shutdown. } - clearEpollIn(); + clearEpollIn0(); pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { close(voidPromise()); @@ -534,16 +529,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } protected final void clearEpollIn0() { - assert eventLoop().inEventLoop(); - try { - readPending = false; - clearFlag(Native.EPOLLIN); - } catch (IOException e) { - // When this happens there is something completely wrong with either the filedescriptor or epoll, - // so fire the exception through the pipeline and close the Channel. - pipeline().fireExceptionCaught(e); - unsafe().close(unsafe().voidPromise()); - } + assert !isRegistered() || eventLoop().inEventLoop(); + readPending = false; + clearFlag(Native.EPOLLIN); } @Override @@ -668,7 +656,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann /** * Finish the connect */ - private boolean doFinishConnect() throws Exception { + private boolean doFinishConnect() throws IOException { if (socket.finishConnect()) { clearFlag(Native.EPOLLOUT); if (requestedRemoteAddress instanceof InetSocketAddress) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java index 2d2610c0e2..8a946921be 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java @@ -150,8 +150,7 @@ public class EpollChannelConfig extends DefaultChannelConfig { if (mode == null) { throw new NullPointerException("mode"); } - try { - switch (mode) { + switch (mode) { case EDGE_TRIGGERED: checkChannelNotRegistered(); ((AbstractEpollChannel) channel).setFlag(Native.EPOLLET); @@ -162,9 +161,6 @@ public class EpollChannelConfig extends DefaultChannelConfig { break; default: throw new Error(); - } - } catch (IOException e) { - throw new ChannelException(e); } return this; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 27bc1d1065..ffcf48aa6b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -33,6 +33,7 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; +import java.util.BitSet; import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -59,6 +60,8 @@ class EpollEventLoop extends SingleThreadEventLoop { private final FileDescriptor eventFd; private final FileDescriptor timerFd; private final IntObjectMap channels = new IntObjectHashMap(4096); + private final BitSet pendingFlagChannels = new BitSet(); + private final boolean allowGrowing; private final EpollEventArray events; @@ -190,6 +193,7 @@ class EpollEventLoop extends SingleThreadEventLoop { assert inEventLoop(); int fd = ch.socket.intValue(); Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags); + ch.activeFlags = ch.flags; AbstractEpollChannel old = channels.put(fd, ch); // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already @@ -203,6 +207,28 @@ class EpollEventLoop extends SingleThreadEventLoop { void modify(AbstractEpollChannel ch) throws IOException { assert inEventLoop(); Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags); + ch.activeFlags = ch.flags; + } + + void updatePendingFlagsSet(AbstractEpollChannel ch) { + pendingFlagChannels.set(ch.socket.intValue(), ch.flags != ch.activeFlags); + } + + private void processPendingChannelFlags() { + // Call epollCtlMod for any channels that require event interest changes before epollWaiting + if (!pendingFlagChannels.isEmpty()) { + for (int fd = 0; (fd = pendingFlagChannels.nextSetBit(fd)) >= 0; pendingFlagChannels.clear(fd)) { + AbstractEpollChannel ch = channels.get(fd); + if (ch != null) { + try { + ch.modifyEvents(); + } catch (IOException e) { + ch.pipeline().fireExceptionCaught(e); + ch.close(); + } + } + } + } } /** @@ -219,10 +245,14 @@ class EpollEventLoop extends SingleThreadEventLoop { // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed. assert !ch.isOpen(); - } else if (ch.isOpen()) { - // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically - // removed once the file-descriptor is closed. - Native.epollCtlDel(epollFd.intValue(), fd); + } else { + ch.activeFlags = 0; + pendingFlagChannels.clear(fd); + if (ch.isOpen()) { + // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically + // removed once the file-descriptor is closed. + Native.epollCtlDel(epollFd.intValue(), fd); + } } } @@ -288,6 +318,7 @@ class EpollEventLoop extends SingleThreadEventLoop { protected void run() { for (;;) { try { + processPendingChannelFlags(); int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: