Motivation Currently an epoll_ctl syscall is made every time there is a change to the event interest flags (EPOLLIN, EPOLLOUT, etc) of a channel. These are only done in the event loop so can be aggregated into 0 or 1 such calls per channel prior to the next call to epoll_wait. Modifications I think further streamlining/simplification is possible but for now I've tried to minimize structural changes and added the aggregation beneath the existing flag manipulation logic. A new AbstractChannel#activeFlags field records the flags last set on the epoll fd for that channel. Calls to setFlag/clearFlag update the flags field as before but instead of calling epoll_ctl immediately, just set or clear a bit for the channel in a new bitset in the associated EpollEventLoop to reflect whether there's any change to the last set value. Prior to calling epoll_wait the event loop makes the appropriate epoll_ctl(EPOLL_CTL_MOD) call once for each channel who's bit is set. Result Fewer syscalls, particularly in some auto-read=false cases. Simplified error handling from centralization of these calls.
This commit is contained in:
parent
338e1a991c
commit
2b9f69ac38
@ -69,7 +69,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
private volatile SocketAddress local;
|
||||
private volatile SocketAddress remote;
|
||||
|
||||
protected int flags = Native.EPOLLET;
|
||||
protected int flags = Native.EPOLLET | Native.EPOLLIN;
|
||||
protected int activeFlags;
|
||||
boolean inputClosedSeenErrorOnRead;
|
||||
boolean epollInReadyRunnablePending;
|
||||
|
||||
@ -109,17 +110,23 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
}
|
||||
}
|
||||
|
||||
void setFlag(int flag) throws IOException {
|
||||
void setFlag(int flag) {
|
||||
if (!isFlagSet(flag)) {
|
||||
flags |= flag;
|
||||
modifyEvents();
|
||||
updatePendingFlagsSet();
|
||||
}
|
||||
}
|
||||
|
||||
void clearFlag(int flag) throws IOException {
|
||||
void clearFlag(int flag) {
|
||||
if (isFlagSet(flag)) {
|
||||
flags &= ~flag;
|
||||
modifyEvents();
|
||||
updatePendingFlagsSet();
|
||||
}
|
||||
}
|
||||
|
||||
private void updatePendingFlagsSet() {
|
||||
if (isRegistered()) {
|
||||
((EpollEventLoop) eventLoop()).updatePendingFlagsSet(this);
|
||||
}
|
||||
}
|
||||
|
||||
@ -246,33 +253,33 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
((SocketChannelConfig) config).isAllowHalfClosure();
|
||||
}
|
||||
|
||||
private Runnable clearEpollInTask;
|
||||
|
||||
final void clearEpollIn() {
|
||||
// Only clear if registered with an EventLoop as otherwise
|
||||
if (isRegistered()) {
|
||||
final EventLoop loop = eventLoop();
|
||||
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
|
||||
if (loop.inEventLoop()) {
|
||||
unsafe.clearEpollIn0();
|
||||
} else {
|
||||
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
|
||||
loop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (!unsafe.readPending && !config().isAutoRead()) {
|
||||
// Still no read triggered so clear it now
|
||||
unsafe.clearEpollIn0();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// The EventLoop is not registered atm so just update the flags so the correct value
|
||||
// will be used once the channel is registered
|
||||
flags &= ~Native.EPOLLIN;
|
||||
final EventLoop loop = isRegistered() ? eventLoop() : null;
|
||||
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
|
||||
if (loop == null || loop.inEventLoop()) {
|
||||
unsafe.clearEpollIn0();
|
||||
return;
|
||||
}
|
||||
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
|
||||
Runnable clearFlagTask = clearEpollInTask;
|
||||
if (clearFlagTask == null) {
|
||||
clearEpollInTask = clearFlagTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (!unsafe.readPending && !config().isAutoRead()) {
|
||||
// Still no read triggered so clear it now
|
||||
unsafe.clearEpollIn0();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
loop.execute(clearFlagTask);
|
||||
}
|
||||
|
||||
private void modifyEvents() throws IOException {
|
||||
void modifyEvents() throws IOException {
|
||||
if (isOpen() && isRegistered()) {
|
||||
((EpollEventLoop) eventLoop()).modify(this);
|
||||
}
|
||||
@ -416,7 +423,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2254
|
||||
clearEpollIn();
|
||||
clearEpollIn0();
|
||||
}
|
||||
}
|
||||
|
||||
@ -446,19 +453,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
}
|
||||
|
||||
// Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
|
||||
clearEpollRdHup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
|
||||
*/
|
||||
private void clearEpollRdHup() {
|
||||
try {
|
||||
clearFlag(Native.EPOLLRDHUP);
|
||||
} catch (IOException e) {
|
||||
pipeline().fireExceptionCaught(e);
|
||||
close(voidPromise());
|
||||
}
|
||||
clearFlag(Native.EPOLLRDHUP);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -478,7 +473,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
// We attempted to shutdown and failed, which means the input has already effectively been
|
||||
// shutdown.
|
||||
}
|
||||
clearEpollIn();
|
||||
clearEpollIn0();
|
||||
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
||||
} else {
|
||||
close(voidPromise());
|
||||
@ -534,16 +529,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
}
|
||||
|
||||
protected final void clearEpollIn0() {
|
||||
assert eventLoop().inEventLoop();
|
||||
try {
|
||||
readPending = false;
|
||||
clearFlag(Native.EPOLLIN);
|
||||
} catch (IOException e) {
|
||||
// When this happens there is something completely wrong with either the filedescriptor or epoll,
|
||||
// so fire the exception through the pipeline and close the Channel.
|
||||
pipeline().fireExceptionCaught(e);
|
||||
unsafe().close(unsafe().voidPromise());
|
||||
}
|
||||
assert !isRegistered() || eventLoop().inEventLoop();
|
||||
readPending = false;
|
||||
clearFlag(Native.EPOLLIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -668,7 +656,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
||||
/**
|
||||
* Finish the connect
|
||||
*/
|
||||
private boolean doFinishConnect() throws Exception {
|
||||
private boolean doFinishConnect() throws IOException {
|
||||
if (socket.finishConnect()) {
|
||||
clearFlag(Native.EPOLLOUT);
|
||||
if (requestedRemoteAddress instanceof InetSocketAddress) {
|
||||
|
@ -150,8 +150,7 @@ public class EpollChannelConfig extends DefaultChannelConfig {
|
||||
if (mode == null) {
|
||||
throw new NullPointerException("mode");
|
||||
}
|
||||
try {
|
||||
switch (mode) {
|
||||
switch (mode) {
|
||||
case EDGE_TRIGGERED:
|
||||
checkChannelNotRegistered();
|
||||
((AbstractEpollChannel) channel).setFlag(Native.EPOLLET);
|
||||
@ -162,9 +161,6 @@ public class EpollChannelConfig extends DefaultChannelConfig {
|
||||
break;
|
||||
default:
|
||||
throw new Error();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.BitSet;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
@ -59,6 +60,8 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
private final FileDescriptor eventFd;
|
||||
private final FileDescriptor timerFd;
|
||||
private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
|
||||
private final BitSet pendingFlagChannels = new BitSet();
|
||||
|
||||
private final boolean allowGrowing;
|
||||
private final EpollEventArray events;
|
||||
|
||||
@ -190,6 +193,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
assert inEventLoop();
|
||||
int fd = ch.socket.intValue();
|
||||
Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
|
||||
ch.activeFlags = ch.flags;
|
||||
AbstractEpollChannel old = channels.put(fd, ch);
|
||||
|
||||
// We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
|
||||
@ -203,6 +207,28 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
void modify(AbstractEpollChannel ch) throws IOException {
|
||||
assert inEventLoop();
|
||||
Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
|
||||
ch.activeFlags = ch.flags;
|
||||
}
|
||||
|
||||
void updatePendingFlagsSet(AbstractEpollChannel ch) {
|
||||
pendingFlagChannels.set(ch.socket.intValue(), ch.flags != ch.activeFlags);
|
||||
}
|
||||
|
||||
private void processPendingChannelFlags() {
|
||||
// Call epollCtlMod for any channels that require event interest changes before epollWaiting
|
||||
if (!pendingFlagChannels.isEmpty()) {
|
||||
for (int fd = 0; (fd = pendingFlagChannels.nextSetBit(fd)) >= 0; pendingFlagChannels.clear(fd)) {
|
||||
AbstractEpollChannel ch = channels.get(fd);
|
||||
if (ch != null) {
|
||||
try {
|
||||
ch.modifyEvents();
|
||||
} catch (IOException e) {
|
||||
ch.pipeline().fireExceptionCaught(e);
|
||||
ch.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -219,10 +245,14 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
|
||||
// If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
|
||||
assert !ch.isOpen();
|
||||
} else if (ch.isOpen()) {
|
||||
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
|
||||
// removed once the file-descriptor is closed.
|
||||
Native.epollCtlDel(epollFd.intValue(), fd);
|
||||
} else {
|
||||
ch.activeFlags = 0;
|
||||
pendingFlagChannels.clear(fd);
|
||||
if (ch.isOpen()) {
|
||||
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
|
||||
// removed once the file-descriptor is closed.
|
||||
Native.epollCtlDel(epollFd.intValue(), fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -288,6 +318,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
protected void run() {
|
||||
for (;;) {
|
||||
try {
|
||||
processPendingChannelFlags();
|
||||
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
|
||||
switch (strategy) {
|
||||
case SelectStrategy.CONTINUE:
|
||||
|
Loading…
Reference in New Issue
Block a user