Close IOUringEventLoop wakeup/eventfd close shutdown race (#10615)
Motivation There is a race condition when shutting down the event loop where the eventFd write performed in the wakeup() method may actually hit a different fd if it's closed and reassigned in the meantime. This was already encountered and addressed in the epoll case. Modifications Similar to what's done for epoll, in IOUringEventLoop: - Reinstate pendingWakeup flag which tracks when there is a wakeup pending (CAS of nextWakeupNanos performed by other thread in the wakeup() method) - Add logic to the cleanup() method to wait for corresponding READ CQE before closing the eventFd - Remove unused fields from IOUringCompletionQueue (cleanup) Result No event loop shutdown race
This commit is contained in:
parent
db73538737
commit
f6c84541be
@ -33,9 +33,6 @@ final class IOUringCompletionQueue {
|
||||
//these unsigned integer pointers(shared with the kernel) will be changed by the kernel
|
||||
private final long kHeadAddress;
|
||||
private final long kTailAddress;
|
||||
private final long kRingMaskAddress;
|
||||
private final long kRingEntriesAddress;
|
||||
private final long kOverflowAddress;
|
||||
|
||||
private final long completionQueueArrayAddress;
|
||||
|
||||
@ -51,9 +48,6 @@ final class IOUringCompletionQueue {
|
||||
int ringFd) {
|
||||
this.kHeadAddress = kHeadAddress;
|
||||
this.kTailAddress = kTailAddress;
|
||||
this.kRingMaskAddress = kRingMaskAddress;
|
||||
this.kRingEntriesAddress = kRingEntriesAddress;
|
||||
this.kOverflowAddress = kOverflowAddress;
|
||||
this.completionQueueArrayAddress = completionQueueArrayAddress;
|
||||
this.ringSize = ringSize;
|
||||
this.ringAddress = ringAddress;
|
||||
@ -110,16 +104,10 @@ final class IOUringCompletionQueue {
|
||||
/**
|
||||
* Block until there is at least one completion ready to be processed.
|
||||
*/
|
||||
boolean ioUringWaitCqe() {
|
||||
//IORING_ENTER_GETEVENTS -> wait until an event is completely processed
|
||||
void ioUringWaitCqe() {
|
||||
int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
|
||||
if (ret < 0) {
|
||||
//Todo throw exception!
|
||||
return false;
|
||||
} else if (ret == 0) {
|
||||
return true;
|
||||
throw new RuntimeException("ioUringEnter syscall returned " + ret);
|
||||
}
|
||||
//Todo throw Exception!
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import io.netty.channel.SingleThreadEventLoop;
|
||||
import io.netty.channel.unix.Errors;
|
||||
import io.netty.channel.unix.FileDescriptor;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.channel.uring.IOUringCompletionQueue.IOUringCompletionQueueCallback;
|
||||
import io.netty.util.collection.IntObjectHashMap;
|
||||
import io.netty.util.collection.IntObjectMap;
|
||||
import io.netty.util.concurrent.RejectedExecutionHandler;
|
||||
@ -32,8 +33,7 @@ import java.util.Queue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
IOUringCompletionQueue.IOUringCompletionQueueCallback {
|
||||
final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCompletionQueueCallback {
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class);
|
||||
|
||||
private final long eventfdReadBuf = PlatformDependent.allocateMemory(8);
|
||||
@ -57,6 +57,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
private final byte[] inet6AddressArray = new byte[16];
|
||||
|
||||
private long prevDeadlineNanos = NONE;
|
||||
private boolean pendingWakeup;
|
||||
|
||||
IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, boolean ioseqAsync,
|
||||
RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
|
||||
@ -167,7 +168,9 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
nextWakeupNanos.set(AWAKE);
|
||||
if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
|
||||
pendingWakeup = true;
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
handleLoopException(t);
|
||||
@ -220,9 +223,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int data) {
|
||||
if (op == Native.IORING_OP_READ && eventfd.intValue() == fd) {
|
||||
if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
|
||||
addEventFdRead(ringBuffer.ioUringSubmissionQueue());
|
||||
}
|
||||
pendingWakeup = false;
|
||||
addEventFdRead(ringBuffer.ioUringSubmissionQueue());
|
||||
} else if (op == Native.IORING_OP_TIMEOUT) {
|
||||
if (res == Native.ERRNO_ETIME_NEGATIVE) {
|
||||
prevDeadlineNanos = NONE;
|
||||
@ -288,6 +290,25 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
|
||||
@Override
|
||||
protected void cleanup() {
|
||||
if (pendingWakeup) {
|
||||
// Another thread is in the process of writing to the eventFd. We must wait to
|
||||
// receive the corresponding CQE before closing it or else the fd int may be
|
||||
// reassigned by the kernel in the meantime.
|
||||
IOUringCompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
|
||||
IOUringCompletionQueueCallback callback = new IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int data) {
|
||||
if (op == Native.IORING_OP_READ && eventfd.intValue() == fd) {
|
||||
pendingWakeup = false;
|
||||
}
|
||||
}
|
||||
};
|
||||
completionQueue.process(callback);
|
||||
while (pendingWakeup) {
|
||||
completionQueue.ioUringWaitCqe();
|
||||
completionQueue.process(callback);
|
||||
}
|
||||
}
|
||||
try {
|
||||
eventfd.close();
|
||||
} catch (IOException e) {
|
||||
|
@ -58,7 +58,7 @@ public class NativeTest {
|
||||
writeEventByteBuf.readerIndex(), writeEventByteBuf.writerIndex()));
|
||||
submissionQueue.submit();
|
||||
|
||||
assertTrue(completionQueue.ioUringWaitCqe());
|
||||
completionQueue.ioUringWaitCqe();
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int mask) {
|
||||
@ -72,7 +72,7 @@ public class NativeTest {
|
||||
readEventByteBuf.writerIndex(), readEventByteBuf.capacity()));
|
||||
submissionQueue.submit();
|
||||
|
||||
assertTrue(completionQueue.ioUringWaitCqe());
|
||||
completionQueue.ioUringWaitCqe();
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int mask) {
|
||||
@ -103,7 +103,7 @@ public class NativeTest {
|
||||
Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertTrue(completionQueue.ioUringWaitCqe());
|
||||
completionQueue.ioUringWaitCqe();
|
||||
try {
|
||||
completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
@ -152,7 +152,7 @@ public class NativeTest {
|
||||
}
|
||||
}.start();
|
||||
|
||||
assertTrue(completionQueue.ioUringWaitCqe());
|
||||
completionQueue.ioUringWaitCqe();
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int mask) {
|
||||
@ -183,7 +183,7 @@ public class NativeTest {
|
||||
Thread waitingCqe = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertTrue(completionQueue.ioUringWaitCqe());
|
||||
completionQueue.ioUringWaitCqe();
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int mask) {
|
||||
@ -251,7 +251,7 @@ public class NativeTest {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
assertTrue(completionQueue.ioUringWaitCqe());
|
||||
completionQueue.ioUringWaitCqe();
|
||||
assertEquals(2, completionQueue.process(verifyCallback));
|
||||
} catch (AssertionError error) {
|
||||
errorRef.set(error);
|
||||
|
Loading…
Reference in New Issue
Block a user