From 46cff5399f6f069597b0f5ccccbf76038c7588b1 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 11 Dec 2019 15:43:18 +0100 Subject: [PATCH] Revert "Epoll: Avoid redundant EPOLL_CTL_MOD calls (#9397)" This reverts commit 250b279bd98e043e8c56dd9b1d6545a85790c10c. --- .../channel/epoll/AbstractEpollChannel.java | 90 +++++++++++-------- .../channel/epoll/EpollChannelConfig.java | 2 +- .../io/netty/channel/epoll/EpollHandler.java | 44 ++------- .../channel/epoll/EpollRegistration.java | 4 +- 4 files changed, 64 insertions(+), 76 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 0c8a60b625..bdbdf94513 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -69,8 +69,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann private volatile SocketAddress local; private volatile SocketAddress remote; - protected int flags = Native.EPOLLET | Native.EPOLLIN; - protected int activeFlags; + protected int flags = Native.EPOLLET; boolean inputClosedSeenErrorOnRead; boolean epollInReadyRunnablePending; @@ -110,23 +109,17 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } } - void setFlag(int flag) { + void setFlag(int flag) throws IOException { if (!isFlagSet(flag)) { flags |= flag; - updatePendingFlagsSet(); + modifyEvents(); } } - void clearFlag(int flag) { + void clearFlag(int flag) throws IOException { if (isFlagSet(flag)) { flags &= ~flag; - updatePendingFlagsSet(); - } - } - - private void updatePendingFlagsSet() { - if (isRegistered() && registration != null) { - registration.update(); + modifyEvents(); } } @@ -259,27 +252,33 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann ((SocketChannelConfig) config).isAllowHalfClosure(); } - private Runnable clearEpollInTask; - final void clearEpollIn() { // Only clear if registered with an EventLoop as otherwise - final EventLoop loop = isRegistered() ? eventLoop() : null; - final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); - if (loop == null || loop.inEventLoop()) { - unsafe.clearEpollIn0(); - return; + if (isRegistered()) { + final EventLoop loop = eventLoop(); + final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); + if (loop.inEventLoop()) { + unsafe.clearEpollIn0(); + } else { + // schedule a task to clear the EPOLLIN as it is not safe to modify it directly + loop.execute(() -> { + if (!unsafe.readPending && !config().isAutoRead()) { + // Still no read triggered so clear it now + unsafe.clearEpollIn0(); + } + }); + } + } else { + // The EventLoop is not registered atm so just update the flags so the correct value + // will be used once the channel is registered + flags &= ~Native.EPOLLIN; } - // schedule a task to clear the EPOLLIN as it is not safe to modify it directly - Runnable clearFlagTask = clearEpollInTask; - if (clearFlagTask == null) { - clearEpollInTask = clearFlagTask = () -> { - if (!unsafe.readPending && !config().isAutoRead()) { - // Still no read triggered so clear it now - unsafe.clearEpollIn0(); - } - }; + } + + private void modifyEvents() throws IOException { + if (isOpen() && isRegistered() && registration != null) { + registration.update(); } - loop.execute(clearFlagTask); } @Override @@ -411,7 +410,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 - clearEpollIn0(); + clearEpollIn(); } } @@ -441,7 +440,19 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event. - clearFlag(Native.EPOLLRDHUP); + clearEpollRdHup(); + } + + /** + * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure. + */ + private void clearEpollRdHup() { + try { + clearFlag(Native.EPOLLRDHUP); + } catch (IOException e) { + pipeline().fireExceptionCaught(e); + close(voidPromise()); + } } /** @@ -461,7 +472,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // We attempted to shutdown and failed, which means the input has already effectively been // shutdown. } - clearEpollIn0(); + clearEpollIn(); pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { close(voidPromise()); @@ -517,9 +528,16 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } protected final void clearEpollIn0() { - assert !isRegistered() || eventLoop().inEventLoop(); - readPending = false; - clearFlag(Native.EPOLLIN); + assert eventLoop().inEventLoop(); + try { + readPending = false; + clearFlag(Native.EPOLLIN); + } catch (IOException e) { + // When this happens there is something completely wrong with either the filedescriptor or epoll, + // so fire the exception through the pipeline and close the Channel. + pipeline().fireExceptionCaught(e); + unsafe().close(unsafe().voidPromise()); + } } @Override @@ -639,7 +657,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann /** * Finish the connect */ - private boolean doFinishConnect() throws IOException { + private boolean doFinishConnect() throws Exception { if (socket.finishConnect()) { clearFlag(Native.EPOLLOUT); if (requestedRemoteAddress instanceof InetSocketAddress) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java index 5777da74e2..2d10c41466 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java @@ -16,12 +16,12 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBufAllocator; + import io.netty.channel.DefaultChannelConfig; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; - import static io.netty.channel.unix.Limits.SSIZE_MAX; public class EpollChannelConfig extends DefaultChannelConfig { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollHandler.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollHandler.java index 463cae6784..f68442f708 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollHandler.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollHandler.java @@ -36,9 +36,8 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; -import java.util.BitSet; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; import static java.lang.Math.min; @@ -62,7 +61,6 @@ public class EpollHandler implements IoHandler { private final FileDescriptor eventFd; private final FileDescriptor timerFd; private final IntObjectMap channels = new IntObjectHashMap<>(4096); - private final BitSet pendingFlagChannels = new BitSet(); private final boolean allowGrowing; private final EpollEventArray events; @@ -189,8 +187,8 @@ public class EpollHandler implements IoHandler { final AbstractEpollChannel epollChannel = cast(channel); epollChannel.register0(new EpollRegistration() { @Override - public void update() { - EpollHandler.this.updatePendingFlagsSet(epollChannel); + public void update() throws IOException { + EpollHandler.this.modify(epollChannel); } @Override @@ -230,8 +228,6 @@ public class EpollHandler implements IoHandler { private void add(AbstractEpollChannel ch) throws IOException { int fd = ch.socket.intValue(); Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags); - ch.activeFlags = ch.flags; - AbstractEpollChannel old = channels.put(fd, ch); // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already @@ -239,32 +235,11 @@ public class EpollHandler implements IoHandler { assert old == null || !old.isOpen(); } - void updatePendingFlagsSet(AbstractEpollChannel ch) { - pendingFlagChannels.set(ch.socket.intValue(), ch.flags != ch.activeFlags); - } - - private void processPendingChannelFlags() { - // Call epollCtlMod for any channels that require event interest changes before epollWaiting - if (!pendingFlagChannels.isEmpty()) { - for (int fd = 0; (fd = pendingFlagChannels.nextSetBit(fd)) >= 0; pendingFlagChannels.clear(fd)) { - AbstractEpollChannel ch = channels.get(fd); - if (ch != null) { - try { - modify(ch); - } catch (IOException e) { - ch.pipeline().fireExceptionCaught(e); - ch.close(); - } - } - } - } - } /** * The flags of the given epoll was modified so update the registration */ private void modify(AbstractEpollChannel ch) throws IOException { Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags); - ch.activeFlags = ch.flags; } /** @@ -280,14 +255,10 @@ public class EpollHandler implements IoHandler { // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed. assert !ch.isOpen(); - } else { - ch.activeFlags = 0; - pendingFlagChannels.clear(fd); - if (ch.isOpen()) { - // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically - // removed once the file-descriptor is closed. - Native.epollCtlDel(epollFd.intValue(), fd); - } + } else if (ch.isOpen()) { + // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically + // removed once the file-descriptor is closed. + Native.epollCtlDel(epollFd.intValue(), fd); } } @@ -324,7 +295,6 @@ public class EpollHandler implements IoHandler { public final int run(IoExecutionContext context) { int handled = 0; try { - processPendingChannelFlags(); int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock()); switch (strategy) { case SelectStrategy.CONTINUE: diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRegistration.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRegistration.java index d8d9e0433b..2e695885c6 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRegistration.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRegistration.java @@ -20,14 +20,14 @@ import io.netty.channel.unix.IovArray; import java.io.IOException; /** - * Registration with an {@link EpollHandler}. + * Registration with an {@link EpollEventLoop}. */ interface EpollRegistration { /** * Update the registration as some flags did change. */ - void update(); + void update() throws IOException; /** * Remove the registration. No more IO will be handled for it.