From 86f53083e75a525ebd9f53c863da596fbd6c9961 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 23 May 2016 08:59:59 +0200 Subject: [PATCH] [#5297] Ensure calling NioEventLoop.pendingTasks() and EpollEventLoop.pendingTasks() will not produce livelock Motivation: SingleThreadEventExecutor.pendingTasks() will call taskQueue.size() to get the number of pending tasks in the queue. This is not safe when using MpscLinkedQueue as size() is only allowed to be called by a single consumer. Modifications: Ensure size() is only called from the EventLoop. Result: No more livelock possible when call pendingTasks, no matter from which thread it is done. --- .../concurrent/SingleThreadEventExecutor.java | 2 +- .../netty/channel/epoll/EpollEventLoop.java | 19 ++++++++++++++++++- .../io/netty/channel/nio/NioEventLoop.java | 19 +++++++++++++++++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 2fb8578926..e4039cf5ed 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -265,7 +265,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx * Be aware that this operation may be expensive as it depends on the internal implementation of the * SingleThreadEventExecutor. So use it was care! */ - public final int pendingTasks() { + public int pendingTasks() { return taskQueue.size(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 1e0ac80cf8..b6e3483519 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -65,7 +66,12 @@ final class EpollEventLoop extends SingleThreadEventLoop { return Native.epollWait(epollFd.intValue(), events, 0); } }; - + private final Callable pendingTasksCallable = new Callable() { + @Override + public Integer call() throws Exception { + return EpollEventLoop.super.pendingTasks(); + } + }; private volatile int wakenUp; private volatile int ioRatio = 50; @@ -167,6 +173,17 @@ final class EpollEventLoop extends SingleThreadEventLoop { return PlatformDependent.newMpscQueue(); } + @Override + public int pendingTasks() { + // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as + // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer). + // See https://github.com/netty/netty/issues/5297 + if (inEventLoop()) { + return super.pendingTasks(); + } else { + return submit(pendingTasksCallable).syncUninterruptibly().getNow(); + } + } /** * Returns the percentage of the desired amount of time spent for I/O in the event loop. */ diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index b06b5bf630..806dcccd88 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -40,6 +40,7 @@ import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.Queue; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,6 +68,12 @@ public final class NioEventLoop extends SingleThreadEventLoop { return selectNow(); } }; + private final Callable pendingTasksCallable = new Callable() { + @Override + public Integer call() throws Exception { + return NioEventLoop.super.pendingTasks(); + } + }; // Workaround for JDK NIO bug. // @@ -183,6 +190,18 @@ public final class NioEventLoop extends SingleThreadEventLoop { return PlatformDependent.newMpscQueue(); } + @Override + public int pendingTasks() { + // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as + // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer). + // See https://github.com/netty/netty/issues/5297 + if (inEventLoop()) { + return super.pendingTasks(); + } else { + return submit(pendingTasksCallable).syncUninterruptibly().getNow(); + } + } + /** * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector} * of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will