Revert changes in EpollEventLoop that were done recently and did cause various problems in different testsuites.
Motivation: Changes that were done to the EpollEventLoop to optimize some things did break some testsuite and caused timeouts. We need to investigate to see why this is the case but for now we should just revert so we can do a release. Modifivations: - Partly revert 1fa7a5e697825bdd2f5ad7885b64749ede5c3192 and a22d4ba859b115d353b4cea1af581b987249adf6 Result: Testsuites pass again.
This commit is contained in:
parent
b409f8e7fa
commit
7f391426a2
@ -35,9 +35,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
import java.io.IOException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
|
||||
import static java.lang.Math.min;
|
||||
|
||||
@ -46,6 +44,8 @@ import static java.lang.Math.min;
|
||||
*/
|
||||
class EpollEventLoop extends SingleThreadEventLoop {
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
|
||||
private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER =
|
||||
AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");
|
||||
|
||||
static {
|
||||
// Ensure JNI is initialized by the time this class is loaded by this time!
|
||||
@ -53,16 +53,8 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
Epoll.ensureAvailability();
|
||||
}
|
||||
|
||||
/**
|
||||
* When in epollWait(), this mirrors the currently-set deadline of the timerFd. A negative value
|
||||
* means that the event loop is awake, which blocks rescheduling activity by other threads.
|
||||
* It is restored to the real timerFd expiry time again prior to entering epollWait().
|
||||
*
|
||||
* Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting
|
||||
* with the time source (e.g. calling System.nanoTime()) which can be expensive.
|
||||
*/
|
||||
private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L);
|
||||
private final AtomicInteger wakenUp = new AtomicInteger();
|
||||
// Pick a number that no task could have previously used.
|
||||
private long prevDeadlineNanos = nanoTime() - 1;
|
||||
private final FileDescriptor epollFd;
|
||||
private final FileDescriptor eventFd;
|
||||
private final FileDescriptor timerFd;
|
||||
@ -81,6 +73,12 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
return epollWaitNow();
|
||||
}
|
||||
};
|
||||
@SuppressWarnings("unused") // AtomicIntegerFieldUpdater
|
||||
private volatile int wakenUp;
|
||||
private volatile int ioRatio = 50;
|
||||
|
||||
// See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
|
||||
private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
|
||||
|
||||
EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
|
||||
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
|
||||
@ -177,88 +175,9 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
return datagramPacketArray;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
|
||||
return false; // don't wake event loop
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
|
||||
try {
|
||||
trySetTimerFd(deadlineNanos);
|
||||
} catch (IOException e) {
|
||||
throw new RejectedExecutionException(e);
|
||||
}
|
||||
return false; // don't wake event loop
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean runAllTasks() {
|
||||
// This method is overridden to ensure that all the expired scheduled tasks are executed during shutdown, and
|
||||
// any other execute all scenarios in the base class.
|
||||
return runScheduledAndExecutorTasks(4);
|
||||
}
|
||||
|
||||
private void trySetTimerFd(long candidateNextDeadline) throws IOException {
|
||||
for (;;) {
|
||||
long nextDeadline = nextDeadlineNanos.get();
|
||||
if (nextDeadline <= candidateNextDeadline) {
|
||||
// This includes case where nextDeadline is negative (event loop is awake)
|
||||
return;
|
||||
}
|
||||
if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) {
|
||||
// We must serialize calls to setTimerFd to avoid the set of a later deadline
|
||||
// racing with a sooner one and overwriting it. A second check of nextDeadlineNanos
|
||||
// is made within the sync block to avoid having the CAS within the sync
|
||||
synchronized (nextDeadlineNanos) {
|
||||
nextDeadline = nextDeadlineNanos.get();
|
||||
if (nextDeadline == candidateNextDeadline ||
|
||||
(nextDeadline + Long.MAX_VALUE + 1) == candidateNextDeadline) {
|
||||
setTimerFd(deadlineToDelayNanos(candidateNextDeadline));
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setTimerFd(long candidateNextDelayNanos) throws IOException {
|
||||
if (candidateNextDelayNanos > 0) {
|
||||
final int delaySeconds = (int) min(candidateNextDelayNanos / 1000000000L, Integer.MAX_VALUE);
|
||||
final int delayNanos = (int) min(candidateNextDelayNanos - delaySeconds * 1000000000L, Integer.MAX_VALUE);
|
||||
Native.timerFdSetTime(timerFd.intValue(), delaySeconds, delayNanos);
|
||||
} else {
|
||||
// Setting the timer to 0, 0 will disarm it, so we have a few options:
|
||||
// 1. Set the timer wakeup to 1ns (1 system call).
|
||||
// 2. Use the eventFd to force a wakeup and disarm the timer (2 system calls).
|
||||
// For now we are using option (1) because there are less system calls, and we will correctly reset the
|
||||
// nextDeadlineNanos state when the EventLoop processes the timer wakeup.
|
||||
Native.timerFdSetTime(timerFd.intValue(), 0, 1);
|
||||
}
|
||||
}
|
||||
|
||||
private long checkScheduleTaskQueueForNewDelay(long timerFdDeadline) throws IOException {
|
||||
assert nextDeadlineNanos.get() < 0;
|
||||
final long nextTaskDeadlineNanos = nextScheduledTaskDeadlineNanos();
|
||||
if (nextTaskDeadlineNanos == -1 || nextTaskDeadlineNanos >= timerFdDeadline) {
|
||||
// Just restore to preexisting timerFd value, update not needed
|
||||
nextDeadlineNanos.lazySet(timerFdDeadline);
|
||||
} else {
|
||||
synchronized (nextDeadlineNanos) {
|
||||
// Shorter delay required than current timerFd setting, update it
|
||||
nextDeadlineNanos.lazySet(timerFdDeadline = nextTaskDeadlineNanos);
|
||||
setTimerFd(deadlineToDelayNanos(timerFdDeadline));
|
||||
}
|
||||
}
|
||||
return timerFdDeadline;
|
||||
// Don't disarm the timerFd even if there are no more queued tasks. Since we are setting timerFd from outside
|
||||
// the EventLoop it is possible that another thread has set the timer and we may miss a wakeup if we disarm
|
||||
// the timer here. Instead we wait for the timer wakeup on the EventLoop and clear state for the next timer.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void wakeup(boolean inEventLoop) {
|
||||
if (!inEventLoop && wakenUp.getAndSet(1) == 0) {
|
||||
if (!inEventLoop && WAKEN_UP_UPDATER.getAndSet(this, 1) == 0) {
|
||||
// write to the evfd which will then wake-up epoll_wait(...)
|
||||
Native.eventFdWrite(eventFd.intValue(), 1L);
|
||||
}
|
||||
@ -318,21 +237,47 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
|
||||
*/
|
||||
public int getIoRatio() {
|
||||
return ioRatio;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
|
||||
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
|
||||
*/
|
||||
public void setIoRatio(int ioRatio) {
|
||||
if (ioRatio <= 0 || ioRatio > 100) {
|
||||
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
|
||||
}
|
||||
this.ioRatio = ioRatio;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int registeredChannels() {
|
||||
return channels.size();
|
||||
}
|
||||
|
||||
private int epollWait() throws IOException {
|
||||
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
|
||||
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
|
||||
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
|
||||
// in pipeline.
|
||||
return Native.epollWait(epollFd, events, hasTasks());
|
||||
int delaySeconds;
|
||||
int delayNanos;
|
||||
long curDeadlineNanos = deadlineNanos();
|
||||
if (curDeadlineNanos == prevDeadlineNanos) {
|
||||
delaySeconds = -1;
|
||||
delayNanos = -1;
|
||||
} else {
|
||||
long totalDelay = delayNanos(System.nanoTime());
|
||||
prevDeadlineNanos = curDeadlineNanos;
|
||||
delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
|
||||
delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
|
||||
}
|
||||
return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos);
|
||||
}
|
||||
|
||||
private int epollWaitNow() throws IOException {
|
||||
return Native.epollWait(epollFd, events, true);
|
||||
return Native.epollWait(epollFd, events, timerFd, 0, 0);
|
||||
}
|
||||
|
||||
private int epollBusyWait() throws IOException {
|
||||
@ -341,7 +286,6 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
|
||||
@Override
|
||||
protected void run() {
|
||||
long timerFdDeadline = Long.MAX_VALUE;
|
||||
for (;;) {
|
||||
try {
|
||||
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
|
||||
@ -354,38 +298,38 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
break;
|
||||
|
||||
case SelectStrategy.SELECT:
|
||||
if (wakenUp.get() == 1) {
|
||||
wakenUp.set(0);
|
||||
if (wakenUp == 1) {
|
||||
wakenUp = 0;
|
||||
}
|
||||
if (!hasTasks()) {
|
||||
// When we are in the EventLoop we don't bother setting the timerFd for each
|
||||
// scheduled task, but instead defer the processing until the end of the EventLoop
|
||||
// (next wait) to reduce the timerFd modifications.
|
||||
timerFdDeadline = checkScheduleTaskQueueForNewDelay(timerFdDeadline);
|
||||
try {
|
||||
strategy = epollWait();
|
||||
} finally {
|
||||
// This getAndAdd will change the raw value of nextDeadlineNanos to be negative
|
||||
// which will block any *new* timerFd mods by other threads while also "preserving"
|
||||
// its last value to avoid disrupting a possibly-concurrent setTimerFd call
|
||||
// (so that we can know the timerFd really did/will get updated to the read value).
|
||||
timerFdDeadline = nextDeadlineNanos.getAndAdd(Long.MAX_VALUE + 1);
|
||||
// The value of nextDeadlineNanos is now guaranteed to be negative
|
||||
}
|
||||
strategy = epollWait();
|
||||
}
|
||||
// fallthrough
|
||||
default:
|
||||
}
|
||||
|
||||
try {
|
||||
if (processReady(events, strategy)) {
|
||||
// Polled events include timerFd expiry; conservatively assume that no timer is set
|
||||
timerFdDeadline = Long.MAX_VALUE;
|
||||
final int ioRatio = this.ioRatio;
|
||||
if (ioRatio == 100) {
|
||||
try {
|
||||
if (strategy > 0) {
|
||||
processReady(events, strategy);
|
||||
}
|
||||
} finally {
|
||||
// Ensure we always run tasks.
|
||||
runAllTasks();
|
||||
}
|
||||
} else {
|
||||
final long ioStartTime = System.nanoTime();
|
||||
|
||||
try {
|
||||
if (strategy > 0) {
|
||||
processReady(events, strategy);
|
||||
}
|
||||
} finally {
|
||||
// Ensure we always run tasks.
|
||||
final long ioTime = System.nanoTime() - ioStartTime;
|
||||
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
|
||||
}
|
||||
} finally {
|
||||
runAllTasks();
|
||||
// No need to drainScheduledQueue() after the fact, because all in event loop scheduling results
|
||||
// in direct addition to the scheduled priority queue.
|
||||
}
|
||||
if (allowGrowing && strategy == events.length()) {
|
||||
//increase the size of the array as we needed the whole space for the events
|
||||
@ -439,17 +383,13 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if a timerFd event was encountered
|
||||
private boolean processReady(EpollEventArray events, int ready) {
|
||||
boolean timerFired = false;
|
||||
for (int i = 0; i < ready; ++i) {
|
||||
private void processReady(EpollEventArray events, int ready) {
|
||||
for (int i = 0; i < ready; i ++) {
|
||||
final int fd = events.fd(i);
|
||||
if (fd == eventFd.intValue()) {
|
||||
if (fd == eventFd.intValue() || fd == timerFd.intValue()) {
|
||||
// Just ignore as we use ET mode for the eventfd and timerfd.
|
||||
//
|
||||
// See also https://stackoverflow.com/a/12492308/1074097
|
||||
} else if (fd == timerFd.intValue()) {
|
||||
timerFired = true;
|
||||
} else {
|
||||
final long ev = events.events(i);
|
||||
|
||||
@ -503,7 +443,6 @@ class EpollEventLoop extends SingleThreadEventLoop {
|
||||
}
|
||||
}
|
||||
}
|
||||
return timerFired;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
x
Reference in New Issue
Block a user