Allow to set max capacity for task queue for EventExecutors and EventLoops

Motivation:

To restrict the memory usage of a system it is sometimes needed to adjust the number of max pending tasks in the tasks queue.

Modifications:

- Add new constructors to modify the number of allowed pending tasks.
- Add system properties to configure the default values.

Result:

More flexible configuration.
This commit is contained in:
Norman Maurer 2016-06-21 16:59:46 +02:00
parent fc85eb34ce
commit 7ed8364a7b
7 changed files with 82 additions and 13 deletions

View File

@ -28,6 +28,10 @@ final class DefaultEventExecutor extends SingleThreadEventExecutor {
super(parent, threadFactory, true); super(parent, threadFactory, true);
} }
DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory, int maxPendingTasks) {
super(parent, threadFactory, true, maxPendingTasks);
}
@Override @Override
protected void run() { protected void run() {
for (;;) { for (;;) {

View File

@ -22,7 +22,6 @@ import java.util.concurrent.ThreadFactory;
* to handle the tasks. * to handle the tasks.
*/ */
public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup { public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
/** /**
* @see {@link #DefaultEventExecutorGroup(int, ThreadFactory)} * @see {@link #DefaultEventExecutorGroup(int, ThreadFactory)}
*/ */
@ -37,12 +36,23 @@ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
*/ */
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) { public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory); this(nThreads, threadFactory, SingleThreadEventExecutor.DEFAULT_MAX_PENDING_TASKS);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
*/
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory, int maxPendingTasks) {
super(nThreads, threadFactory, maxPendingTasks);
} }
@Override @Override
protected EventExecutor newChild( protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception { ThreadFactory threadFactory, Object... args) throws Exception {
return new DefaultEventExecutor(this, threadFactory); return new DefaultEventExecutor(this, threadFactory, (Integer) args[0]);
} }
} }

View File

@ -16,6 +16,7 @@
package io.netty.util.concurrent; package io.netty.util.concurrent;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -39,6 +40,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
*/ */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor { public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor {
static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
@ -73,6 +77,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
private final Semaphore threadLock = new Semaphore(0); private final Semaphore threadLock = new Semaphore(0);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>(); private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp; private final boolean addTaskWakesUp;
private final int maxPendingTasks;
private long lastExecutionTime; private long lastExecutionTime;
@ -95,7 +100,21 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
*/ */
protected SingleThreadEventExecutor( protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS);
}
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread}
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
*/
@SuppressWarnings("deprecation")
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks) {
if (threadFactory == null) { if (threadFactory == null) {
throw new NullPointerException("threadFactory"); throw new NullPointerException("threadFactory");
} }
@ -155,17 +174,26 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
} }
}); });
threadProperties = new DefaultThreadProperties(thread); threadProperties = new DefaultThreadProperties(thread);
this.maxPendingTasks = Math.max(16, maxPendingTasks);
taskQueue = newTaskQueue(); taskQueue = newTaskQueue();
} }
/**
* @deprecated Please use and override {@link #newTaskQueue(int)}.
*/
@Deprecated
protected Queue<Runnable> newTaskQueue() {
return newTaskQueue(maxPendingTasks);
}
/** /**
* Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
* {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
* calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
* implementation that does not support blocking operations at all. * implementation that does not support blocking operations at all.
*/ */
protected Queue<Runnable> newTaskQueue() { protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(); return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
} }
@Override @Override

View File

@ -83,7 +83,12 @@ public final class PlatformDependent {
private static final boolean DIRECT_BUFFER_PREFERRED = private static final boolean DIRECT_BUFFER_PREFERRED =
HAS_UNSAFE && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false); HAS_UNSAFE && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
private static final long MAX_DIRECT_MEMORY = maxDirectMemory0(); private static final long MAX_DIRECT_MEMORY = maxDirectMemory0();
private static final int MAX_MPSC_CAPACITY = 1024 * 1024; // TODO: Maybe make this configurable ? private static final int MPSC_CHUNK_SIZE = 1024;
private static final int MIN_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * 2;
private static final int DEFAULT_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * MPSC_CHUNK_SIZE;
// This is currently the maximal allowed capacity in JCTools.
// See https://github.com/JCTools/JCTools/issues/115
private static final int MAX_ALLOWED_MPSC_CAPACITY = Integer.MAX_VALUE >> 2;
private static final long ARRAY_BASE_OFFSET = arrayBaseOffset0(); private static final long ARRAY_BASE_OFFSET = arrayBaseOffset0();
@ -586,7 +591,20 @@ public final class PlatformDependent {
* consumer (one thread!). * consumer (one thread!).
*/ */
public static <T> Queue<T> newMpscQueue() { public static <T> Queue<T> newMpscQueue() {
return hasUnsafe() ? new MpscChunkedArrayQueue<T>(1024, MAX_MPSC_CAPACITY, true) return newMpscQueue(DEFAULT_MAX_MPSC_CAPACITY);
}
/**
* Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single
* consumer (one thread!).
*/
public static <T> Queue<T> newMpscQueue(int maxCapacity) {
return hasUnsafe() ?
new MpscChunkedArrayQueue<T>(MPSC_CHUNK_SIZE,
// Calculate the max capacity which can not be bigger then MAX_ALLOWED_MPSC_CAPACITY.
// This is forced by the MpscChunkedArrayQueue implementation as will try to round it
// up to the next power of two and so will overflow otherwise.
Math.max(Math.min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY), MIN_MAX_MPSC_CAPACITY), true)
: new MpscLinkedAtomicQueue<T>(); : new MpscLinkedAtomicQueue<T>();
} }

View File

@ -137,7 +137,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
} }
/** /**
* Register the given epoll with this {@link io.netty.channel.EventLoop}. * Register the given epoll with this {@link EventLoop}.
*/ */
void add(AbstractEpollChannel ch) throws IOException { void add(AbstractEpollChannel ch) throws IOException {
assert inEventLoop(); assert inEventLoop();
@ -171,9 +171,9 @@ final class EpollEventLoop extends SingleThreadEventLoop {
} }
@Override @Override
protected Queue<Runnable> newTaskQueue() { protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask() // This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(); return PlatformDependent.newMpscQueue(maxPendingTasks);
} }
@Override @Override

View File

@ -17,6 +17,7 @@ package io.netty.channel;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.SystemPropertyUtil;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -26,11 +27,19 @@ import java.util.concurrent.ThreadFactory;
*/ */
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
/** /**
* @see {@link SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, boolean)} * @see {@link SingleThreadEventExecutor#SingleThreadEventExecutor(EventExecutorGroup, ThreadFactory, boolean)}
*/ */
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp); this(parent, threadFactory, addTaskWakesUp, MAX_PENDING_TASKS);
}
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks);
} }
@Override @Override

View File

@ -193,9 +193,9 @@ public final class NioEventLoop extends SingleThreadEventLoop {
} }
@Override @Override
protected Queue<Runnable> newTaskQueue() { protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask() // This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(); return PlatformDependent.newMpscQueue(maxPendingTasks);
} }
@Override @Override