Revert "Epoll: Avoid redundant EPOLL_CTL_MOD calls (#9397)"

This reverts commit 250b279bd9.
This commit is contained in:
Norman Maurer 2019-12-11 15:43:18 +01:00
parent d7bb05b1ac
commit 46cff5399f
4 changed files with 64 additions and 76 deletions

View File

@ -69,8 +69,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
private volatile SocketAddress local; private volatile SocketAddress local;
private volatile SocketAddress remote; private volatile SocketAddress remote;
protected int flags = Native.EPOLLET | Native.EPOLLIN; protected int flags = Native.EPOLLET;
protected int activeFlags;
boolean inputClosedSeenErrorOnRead; boolean inputClosedSeenErrorOnRead;
boolean epollInReadyRunnablePending; boolean epollInReadyRunnablePending;
@ -110,23 +109,17 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
} }
} }
void setFlag(int flag) { void setFlag(int flag) throws IOException {
if (!isFlagSet(flag)) { if (!isFlagSet(flag)) {
flags |= flag; flags |= flag;
updatePendingFlagsSet(); modifyEvents();
} }
} }
void clearFlag(int flag) { void clearFlag(int flag) throws IOException {
if (isFlagSet(flag)) { if (isFlagSet(flag)) {
flags &= ~flag; flags &= ~flag;
updatePendingFlagsSet(); modifyEvents();
}
}
private void updatePendingFlagsSet() {
if (isRegistered() && registration != null) {
registration.update();
} }
} }
@ -259,27 +252,33 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
((SocketChannelConfig) config).isAllowHalfClosure(); ((SocketChannelConfig) config).isAllowHalfClosure();
} }
private Runnable clearEpollInTask;
final void clearEpollIn() { final void clearEpollIn() {
// Only clear if registered with an EventLoop as otherwise // Only clear if registered with an EventLoop as otherwise
final EventLoop loop = isRegistered() ? eventLoop() : null; if (isRegistered()) {
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); final EventLoop loop = eventLoop();
if (loop == null || loop.inEventLoop()) { final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
unsafe.clearEpollIn0(); if (loop.inEventLoop()) {
return; unsafe.clearEpollIn0();
} else {
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
loop.execute(() -> {
if (!unsafe.readPending && !config().isAutoRead()) {
// Still no read triggered so clear it now
unsafe.clearEpollIn0();
}
});
}
} else {
// The EventLoop is not registered atm so just update the flags so the correct value
// will be used once the channel is registered
flags &= ~Native.EPOLLIN;
} }
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly }
Runnable clearFlagTask = clearEpollInTask;
if (clearFlagTask == null) { private void modifyEvents() throws IOException {
clearEpollInTask = clearFlagTask = () -> { if (isOpen() && isRegistered() && registration != null) {
if (!unsafe.readPending && !config().isAutoRead()) { registration.update();
// Still no read triggered so clear it now
unsafe.clearEpollIn0();
}
};
} }
loop.execute(clearFlagTask);
} }
@Override @Override
@ -411,7 +410,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
// //
// See https://github.com/netty/netty/issues/2254 // See https://github.com/netty/netty/issues/2254
clearEpollIn0(); clearEpollIn();
} }
} }
@ -441,7 +440,19 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
} }
// Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event. // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
clearFlag(Native.EPOLLRDHUP); clearEpollRdHup();
}
/**
* Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
*/
private void clearEpollRdHup() {
try {
clearFlag(Native.EPOLLRDHUP);
} catch (IOException e) {
pipeline().fireExceptionCaught(e);
close(voidPromise());
}
} }
/** /**
@ -461,7 +472,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// We attempted to shutdown and failed, which means the input has already effectively been // We attempted to shutdown and failed, which means the input has already effectively been
// shutdown. // shutdown.
} }
clearEpollIn0(); clearEpollIn();
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else { } else {
close(voidPromise()); close(voidPromise());
@ -517,9 +528,16 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
} }
protected final void clearEpollIn0() { protected final void clearEpollIn0() {
assert !isRegistered() || eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
readPending = false; try {
clearFlag(Native.EPOLLIN); readPending = false;
clearFlag(Native.EPOLLIN);
} catch (IOException e) {
// When this happens there is something completely wrong with either the filedescriptor or epoll,
// so fire the exception through the pipeline and close the Channel.
pipeline().fireExceptionCaught(e);
unsafe().close(unsafe().voidPromise());
}
} }
@Override @Override
@ -639,7 +657,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
/** /**
* Finish the connect * Finish the connect
*/ */
private boolean doFinishConnect() throws IOException { private boolean doFinishConnect() throws Exception {
if (socket.finishConnect()) { if (socket.finishConnect()) {
clearFlag(Native.EPOLLOUT); clearFlag(Native.EPOLLOUT);
if (requestedRemoteAddress instanceof InetSocketAddress) { if (requestedRemoteAddress instanceof InetSocketAddress) {

View File

@ -16,12 +16,12 @@
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator; import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
import static io.netty.channel.unix.Limits.SSIZE_MAX; import static io.netty.channel.unix.Limits.SSIZE_MAX;
public class EpollChannelConfig extends DefaultChannelConfig { public class EpollChannelConfig extends DefaultChannelConfig {

View File

@ -36,9 +36,8 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.BitSet;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import static java.lang.Math.min; import static java.lang.Math.min;
@ -62,7 +61,6 @@ public class EpollHandler implements IoHandler {
private final FileDescriptor eventFd; private final FileDescriptor eventFd;
private final FileDescriptor timerFd; private final FileDescriptor timerFd;
private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<>(4096); private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<>(4096);
private final BitSet pendingFlagChannels = new BitSet();
private final boolean allowGrowing; private final boolean allowGrowing;
private final EpollEventArray events; private final EpollEventArray events;
@ -189,8 +187,8 @@ public class EpollHandler implements IoHandler {
final AbstractEpollChannel epollChannel = cast(channel); final AbstractEpollChannel epollChannel = cast(channel);
epollChannel.register0(new EpollRegistration() { epollChannel.register0(new EpollRegistration() {
@Override @Override
public void update() { public void update() throws IOException {
EpollHandler.this.updatePendingFlagsSet(epollChannel); EpollHandler.this.modify(epollChannel);
} }
@Override @Override
@ -230,8 +228,6 @@ public class EpollHandler implements IoHandler {
private void add(AbstractEpollChannel ch) throws IOException { private void add(AbstractEpollChannel ch) throws IOException {
int fd = ch.socket.intValue(); int fd = ch.socket.intValue();
Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags); Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
ch.activeFlags = ch.flags;
AbstractEpollChannel old = channels.put(fd, ch); AbstractEpollChannel old = channels.put(fd, ch);
// We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
@ -239,32 +235,11 @@ public class EpollHandler implements IoHandler {
assert old == null || !old.isOpen(); assert old == null || !old.isOpen();
} }
void updatePendingFlagsSet(AbstractEpollChannel ch) {
pendingFlagChannels.set(ch.socket.intValue(), ch.flags != ch.activeFlags);
}
private void processPendingChannelFlags() {
// Call epollCtlMod for any channels that require event interest changes before epollWaiting
if (!pendingFlagChannels.isEmpty()) {
for (int fd = 0; (fd = pendingFlagChannels.nextSetBit(fd)) >= 0; pendingFlagChannels.clear(fd)) {
AbstractEpollChannel ch = channels.get(fd);
if (ch != null) {
try {
modify(ch);
} catch (IOException e) {
ch.pipeline().fireExceptionCaught(e);
ch.close();
}
}
}
}
}
/** /**
* The flags of the given epoll was modified so update the registration * The flags of the given epoll was modified so update the registration
*/ */
private void modify(AbstractEpollChannel ch) throws IOException { private void modify(AbstractEpollChannel ch) throws IOException {
Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags); Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
ch.activeFlags = ch.flags;
} }
/** /**
@ -280,14 +255,10 @@ public class EpollHandler implements IoHandler {
// If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed. // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
assert !ch.isOpen(); assert !ch.isOpen();
} else { } else if (ch.isOpen()) {
ch.activeFlags = 0; // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
pendingFlagChannels.clear(fd); // removed once the file-descriptor is closed.
if (ch.isOpen()) { Native.epollCtlDel(epollFd.intValue(), fd);
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
Native.epollCtlDel(epollFd.intValue(), fd);
}
} }
} }
@ -324,7 +295,6 @@ public class EpollHandler implements IoHandler {
public final int run(IoExecutionContext context) { public final int run(IoExecutionContext context) {
int handled = 0; int handled = 0;
try { try {
processPendingChannelFlags();
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock()); int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
switch (strategy) { switch (strategy) {
case SelectStrategy.CONTINUE: case SelectStrategy.CONTINUE:

View File

@ -20,14 +20,14 @@ import io.netty.channel.unix.IovArray;
import java.io.IOException; import java.io.IOException;
/** /**
* Registration with an {@link EpollHandler}. * Registration with an {@link EpollEventLoop}.
*/ */
interface EpollRegistration { interface EpollRegistration {
/** /**
* Update the registration as some flags did change. * Update the registration as some flags did change.
*/ */
void update(); void update() throws IOException;
/** /**
* Remove the registration. No more IO will be handled for it. * Remove the registration. No more IO will be handled for it.