Cleanup IOUringEventLoopGroup construction and allow to specify the ring (#10563)

Motivation:

We should allow to specify the ringsize when constructing the
IOUringEventLoopGroup and also be constistent with the rest of the
EventLoopGroup implementations

Modifications:

- Cleanup constructors
- Make ringSize configurable

Result:

Cleaner code and more flexible in terms of configuration
This commit is contained in:
Norman Maurer 2020-09-11 08:34:57 +02:00 committed by GitHub
parent d990b99a6b
commit 8e86bf60a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 46 additions and 82 deletions

View File

@ -15,13 +15,14 @@
*/
package io.netty.channel.uring;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.IovArray;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -55,15 +56,17 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
private long prevDeadlineNanos = NONE;
private boolean pendingWakeup;
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp);
IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize,
RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
// Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
IOUring.ensureAvailability();
// TODO: Let's hard code this to 8 IovArrays to keep the memory overhead kind of small. We may want to consider
// allow to change this in the future.
iovArrays = new IovArrays(8);
ringBuffer = Native.createRingBuffer(new Runnable() {
ringBuffer = Native.createRingBuffer(ringSize, new Runnable() {
@Override
public void run() {
// Once we submitted its safe to clear the IovArrays and so be able to re-use these.
@ -75,6 +78,14 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
logger.trace("New EventLoop: {}", this.toString());
}
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return newTaskQueue0(maxPendingTasks);

View File

@ -15,14 +15,15 @@
*/
package io.netty.channel.uring;
import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.DefaultEventExecutorChooserFactory;
import io.netty.util.concurrent.EventExecutorChooserFactory;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import io.netty.util.concurrent.ThreadPerTaskExecutor;
import io.netty.util.internal.ObjectUtil;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
@ -46,105 +47,58 @@ public final class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads and the given {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public IOUringEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, 0);
}
/**
* Create a new instance using the specified number of threads and the default {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public IOUringEventLoopGroup(int nThreads, SelectStrategyFactory selectStrategyFactory) {
this(nThreads, (ThreadFactory) null, selectStrategyFactory);
}
/**
* Create a new instance using the specified number of threads and the given {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, 0);
}
public IOUringEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, DefaultSelectStrategyFactory.INSTANCE);
}
/**
* Create a new instance using the specified number of threads and the given {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory,
SelectStrategyFactory selectStrategyFactory) {
this(nThreads, threadFactory, 0, selectStrategyFactory);
this(nThreads, executor, 0);
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* maximal amount of epoll events to handle per epollWait(...).
*
* @deprecated Use {@link #IOUringEventLoopGroup(int)} or {@link #IOUringEventLoopGroup(int, ThreadFactory)}
* maximal size of the used ringbuffer.
*/
@Deprecated
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) {
this(nThreads, threadFactory, maxEventsAtOnce, DefaultSelectStrategyFactory.INSTANCE);
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int ringSize) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), ringSize);
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* maximal amount of epoll events to handle per epollWait(...).
*
* @deprecated Use {@link #IOUringEventLoopGroup(int)}, {@link #IOUringEventLoopGroup(int, ThreadFactory)}, or
* {@link #IOUringEventLoopGroup(int, SelectStrategyFactory)}
*/
@Deprecated
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce,
SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, maxEventsAtOnce, selectStrategyFactory, RejectedExecutionHandlers.reject());
public IOUringEventLoopGroup(int nThreads, Executor executor, int ringsize) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE,
ringsize, RejectedExecutionHandlers.reject());
}
public IOUringEventLoopGroup(int nThreads, Executor executor, SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, 0, selectStrategyFactory, RejectedExecutionHandlers.reject());
private IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
int ringSize, RejectedExecutionHandler rejectedExecutionHandler) {
this(nThreads, executor, chooserFactory, ringSize, rejectedExecutionHandler, null);
}
public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
SelectStrategyFactory selectStrategyFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler);
}
public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
SelectStrategyFactory selectStrategyFactory,
RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(nThreads, executor, chooserFactory, 0,
selectStrategyFactory, rejectedExecutionHandler, queueFactory);
}
/**
* @deprecated This method will be removed in future releases, and is not guaranteed to have any impacts.
*/
@Deprecated
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
private IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
int ringSize, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(nThreads, executor, chooserFactory, ringSize, rejectedExecutionHandler, queueFactory);
}
//Todo
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//EventLoopTaskQueueFactory queueFactory = args.length == 4? (EventLoopTaskQueueFactory) args[3] : null;
// return new IOUringEventLoop(this, executor, (Integer) args[0],
// ((SelectStrategyFactory) args[1]).newSelectStrategy(),
// (RejectedExecutionHandler) args[2], queueFactory);
return new IOUringEventLoop(this, executor, false);
protected EventLoop newChild(Executor executor, Object... args) {
if (args.length != 3) {
throw new IllegalArgumentException("Illegal amount of extra arguments");
}
int ringSize = (Integer) args[0];
ObjectUtil.checkPositiveOrZero(ringSize, "ringSize");
if (ringSize == 0) {
ringSize = Native.DEFAULT_RING_SIZE;
}
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[1];
EventLoopTaskQueueFactory taskQueueFactory = (EventLoopTaskQueueFactory) args[2];
return new IOUringEventLoop(this, executor, ringSize, rejectedExecutionHandler, taskQueueFactory);
}
}

View File

@ -30,10 +30,9 @@ import java.util.Locale;
final class Native {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class);
// Todo expose this via the EventLoopGroup constructor as well.
private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 4096);
static final int DEFAULT_RING_SIZE = Math.max(64, SystemPropertyUtil.getInt("io.netty.uring.ringSize", 4096));
static {
static {
Selector selector = null;
try {
// We call Selector.open() as this will under the hood cause IOUtil to be loaded.