[#5297] Ensure calling NioEventLoop.pendingTasks() and EpollEventLoop.pendingTasks() will not produce livelock
Motivation: SingleThreadEventExecutor.pendingTasks() will call taskQueue.size() to get the number of pending tasks in the queue. This is not safe when using MpscLinkedQueue as size() is only allowed to be called by a single consumer. Modifications: Ensure size() is only called from the EventLoop. Result: No more livelock possible when call pendingTasks, no matter from which thread it is done.
This commit is contained in:
parent
d0ae71f62c
commit
3f2287f5ec
@ -284,7 +284,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
|
||||
* <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
|
||||
* SingleThreadEventExecutor. So use it was care!</strong>
|
||||
*/
|
||||
public final int pendingTasks() {
|
||||
public int pendingTasks() {
|
||||
return taskQueue.size();
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
|
||||
/**
|
||||
@ -66,6 +67,13 @@ final class EpollEventLoop extends SingleThreadEventLoop {
|
||||
}
|
||||
};
|
||||
|
||||
private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
return EpollEventLoop.super.pendingTasks();
|
||||
}
|
||||
};
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private volatile int wakenUp;
|
||||
private volatile int ioRatio = 50;
|
||||
@ -168,6 +176,17 @@ final class EpollEventLoop extends SingleThreadEventLoop {
|
||||
return PlatformDependent.newMpscQueue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int pendingTasks() {
|
||||
// As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
|
||||
// otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
|
||||
// See https://github.com/netty/netty/issues/5297
|
||||
if (inEventLoop()) {
|
||||
return super.pendingTasks();
|
||||
} else {
|
||||
return submit(pendingTasksCallable).syncUninterruptibly().getNow();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
|
||||
*/
|
||||
|
@ -42,6 +42,7 @@ import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@ -68,6 +69,12 @@ public final class NioEventLoop extends SingleThreadEventLoop {
|
||||
return selectNow();
|
||||
}
|
||||
};
|
||||
private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
return NioEventLoop.super.pendingTasks();
|
||||
}
|
||||
};
|
||||
|
||||
// Workaround for JDK NIO bug.
|
||||
//
|
||||
@ -184,6 +191,18 @@ public final class NioEventLoop extends SingleThreadEventLoop {
|
||||
return PlatformDependent.newMpscQueue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int pendingTasks() {
|
||||
// As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
|
||||
// otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
|
||||
// See https://github.com/netty/netty/issues/5297
|
||||
if (inEventLoop()) {
|
||||
return super.pendingTasks();
|
||||
} else {
|
||||
return submit(pendingTasksCallable).syncUninterruptibly().getNow();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
|
||||
* of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
|
||||
|
Loading…
x
Reference in New Issue
Block a user