Allow to set max capacity for task queue for EventExecutors and EventLoops

Motivation:

To restrict the memory usage of a system it is sometimes needed to adjust the number of max pending tasks in the tasks queue.

Modifications:

- Add new constructors to modify the number of allowed pending tasks.
- Add system properties to configure the default values.

Result:

More flexible configuration.
This commit is contained in:
Norman Maurer 2016-06-21 16:59:46 +02:00
parent 77c6d7a672
commit a0d4fb16fc
7 changed files with 104 additions and 14 deletions

View File

@ -48,6 +48,10 @@ public final class DefaultEventExecutor extends SingleThreadEventExecutor {
super(parent, executor, true); super(parent, executor, true);
} }
public DefaultEventExecutor(EventExecutorGroup parent, Executor executor, int maxPendingTasks) {
super(parent, executor, true, maxPendingTasks);
}
@Override @Override
protected void run() { protected void run() {
for (;;) { for (;;) {

View File

@ -23,7 +23,6 @@ import java.util.concurrent.ThreadFactory;
* to handle the tasks. * to handle the tasks.
*/ */
public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup { public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
/** /**
* @see {@link #DefaultEventExecutorGroup(int, ThreadFactory)} * @see {@link #DefaultEventExecutorGroup(int, ThreadFactory)}
*/ */
@ -38,11 +37,22 @@ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
*/ */
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) { public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory); this(nThreads, threadFactory, DefaultEventExecutor.DEFAULT_MAX_PENDING_TASKS);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
*/
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks) {
super(nThreads, threadFactory, maxPendingTasks);
} }
@Override @Override
protected EventExecutor newChild(Executor executor, Object... args) throws Exception { protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventExecutor(this, executor); return new DefaultEventExecutor(this, executor, (Integer) args[0]);
} }
} }

View File

@ -16,6 +16,7 @@
package io.netty.util.concurrent; package io.netty.util.concurrent;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -41,6 +42,9 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
*/ */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor { public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor {
static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
@ -86,6 +90,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
private final Queue<Runnable> taskQueue; private final Queue<Runnable> taskQueue;
private volatile Thread thread; private volatile Thread thread;
@SuppressWarnings("unused")
private volatile ThreadProperties threadProperties; private volatile ThreadProperties threadProperties;
private final Executor executor; private final Executor executor;
private volatile boolean interrupted; private volatile boolean interrupted;
@ -93,6 +98,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
private final Semaphore threadLock = new Semaphore(0); private final Semaphore threadLock = new Semaphore(0);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>(); private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp; private final boolean addTaskWakesUp;
private final int maxPendingTasks;
private long lastExecutionTime; private long lastExecutionTime;
@ -118,6 +124,20 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp); this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
} }
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread}
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
*/
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks);
}
/** /**
* Create a new instance * Create a new instance
* *
@ -127,6 +147,21 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
* executor thread * executor thread
*/ */
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS);
}
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param executor the {@link Executor} which will be used for executing
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
*/
@SuppressWarnings("deprecation")
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks) {
super(parent); super(parent);
if (executor == null) { if (executor == null) {
@ -135,17 +170,26 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
this.addTaskWakesUp = addTaskWakesUp; this.addTaskWakesUp = addTaskWakesUp;
this.executor = executor; this.executor = executor;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
taskQueue = newTaskQueue(); taskQueue = newTaskQueue();
} }
/**
* @deprecated Please use and override {@link #newTaskQueue(int)}.
*/
@Deprecated
protected Queue<Runnable> newTaskQueue() {
return newTaskQueue(maxPendingTasks);
}
/** /**
* Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
* {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
* calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
* implementation that does not support blocking operations at all. * implementation that does not support blocking operations at all.
*/ */
protected Queue<Runnable> newTaskQueue() { protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(); return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
} }
/** /**

View File

@ -87,7 +87,12 @@ public final class PlatformDependent {
private static final boolean DIRECT_BUFFER_PREFERRED = private static final boolean DIRECT_BUFFER_PREFERRED =
HAS_UNSAFE && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false); HAS_UNSAFE && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
private static final long MAX_DIRECT_MEMORY = maxDirectMemory0(); private static final long MAX_DIRECT_MEMORY = maxDirectMemory0();
private static final int MAX_MPSC_CAPACITY = 1024 * 1024; // TODO: Maybe make this configurable ? private static final int MPSC_CHUNK_SIZE = 1024;
private static final int MIN_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * 2;
private static final int DEFAULT_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * MPSC_CHUNK_SIZE;
// This is currently the maximal allowed capacity in JCTools.
// See https://github.com/JCTools/JCTools/issues/115
private static final int MAX_ALLOWED_MPSC_CAPACITY = Integer.MAX_VALUE >> 2;
private static final long BYTE_ARRAY_BASE_OFFSET = PlatformDependent0.byteArrayBaseOffset(); private static final long BYTE_ARRAY_BASE_OFFSET = PlatformDependent0.byteArrayBaseOffset();
@ -727,7 +732,20 @@ public final class PlatformDependent {
* consumer (one thread!). * consumer (one thread!).
*/ */
public static <T> Queue<T> newMpscQueue() { public static <T> Queue<T> newMpscQueue() {
return hasUnsafe() ? new MpscChunkedArrayQueue<T>(1024, MAX_MPSC_CAPACITY, true) return newMpscQueue(DEFAULT_MAX_MPSC_CAPACITY);
}
/**
* Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single
* consumer (one thread!).
*/
public static <T> Queue<T> newMpscQueue(int maxCapacity) {
return hasUnsafe() ?
new MpscChunkedArrayQueue<T>(MPSC_CHUNK_SIZE,
// Calculate the max capacity which can not be bigger then MAX_ALLOWED_MPSC_CAPACITY.
// This is forced by the MpscChunkedArrayQueue implementation as will try to round it
// up to the next power of two and so will overflow otherwise.
Math.max(Math.min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY), MIN_MAX_MPSC_CAPACITY), true)
: new MpscLinkedAtomicQueue<T>(); : new MpscLinkedAtomicQueue<T>();
} }

View File

@ -134,7 +134,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
} }
/** /**
* Register the given epoll with this {@link io.netty.channel.EventLoop}. * Register the given epoll with this {@link EventLoop}.
*/ */
void add(AbstractEpollChannel ch) throws IOException { void add(AbstractEpollChannel ch) throws IOException {
assert inEventLoop(); assert inEventLoop();
@ -168,9 +168,9 @@ final class EpollEventLoop extends SingleThreadEventLoop {
} }
@Override @Override
protected Queue<Runnable> newTaskQueue() { protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask() // This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(); return PlatformDependent.newMpscQueue(maxPendingTasks);
} }
@Override @Override

View File

@ -17,6 +17,7 @@ package io.netty.channel;
import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.SystemPropertyUtil;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -27,12 +28,25 @@ import java.util.concurrent.ThreadFactory;
*/ */
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp); this(parent, threadFactory, addTaskWakesUp, MAX_PENDING_TASKS);
} }
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) { protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp); this(parent, executor, addTaskWakesUp, MAX_PENDING_TASKS);
}
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks);
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks) {
super(parent, executor, addTaskWakesUp, maxPendingTasks);
} }
@Override @Override

View File

@ -192,9 +192,9 @@ public final class NioEventLoop extends SingleThreadEventLoop {
} }
@Override @Override
protected Queue<Runnable> newTaskQueue() { protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask() // This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(); return PlatformDependent.newMpscQueue(maxPendingTasks);
} }
@Override @Override