Correctly implement IOUringSubmissionQueue.addTimeout(...) and ensure we always call runAllTasks()
Motivation: We did have a bug in how we calculated the values for the timespec which lead to incorrect wakeups. Beside this we also missed to always call runAllTasks() which is needed to fetch the ready to be executed scheduled tasks. Modifications: - Fix timespec setup - Always call runAllTasks() - Add extra testcase - Remove @Ignore from previous failing test Result: Timeouts work as expected
This commit is contained in:
parent
77a133344f
commit
0a0cc8a7c0
@ -117,7 +117,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
submissionQueue.addPollIn(eventfd.intValue());
|
||||
submissionQueue.submit();
|
||||
|
||||
for (; ; ) {
|
||||
for (;;) {
|
||||
logger.trace("Run IOUringEventLoop {}", this.toString());
|
||||
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
|
||||
if (curDeadlineNanos == -1L) {
|
||||
@ -152,9 +152,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
|
||||
completionQueue.process(this);
|
||||
|
||||
if (hasTasks()) {
|
||||
runAllTasks();
|
||||
}
|
||||
// Always call runAllTasks() as it will also fetch the scheduled tasks that are ready.
|
||||
runAllTasks();
|
||||
|
||||
try {
|
||||
if (isShuttingDown()) {
|
||||
@ -169,6 +168,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public boolean handle(int fd, int res, long flags, int op, int pollMask) {
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
|
@ -22,6 +22,9 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
final class IOUringSubmissionQueue {
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringSubmissionQueue.class);
|
||||
|
||||
@ -329,7 +332,6 @@ final class IOUringSubmissionQueue {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setTimeout(long timeoutNanoSeconds) {
|
||||
long seconds, nanoSeconds;
|
||||
|
||||
@ -338,8 +340,8 @@ final class IOUringSubmissionQueue {
|
||||
seconds = 0;
|
||||
nanoSeconds = 0;
|
||||
} else {
|
||||
seconds = timeoutNanoSeconds / 1000000000L;
|
||||
nanoSeconds = timeoutNanoSeconds % 1000;
|
||||
seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
|
||||
nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
|
||||
}
|
||||
|
||||
PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
|
||||
|
@ -18,6 +18,8 @@ package io.netty.channel.uring;
|
||||
import io.netty.channel.EventLoop;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class IOUringEventLoopTest {
|
||||
|
||||
@Test
|
||||
@ -50,4 +52,21 @@ public class IOUringEventLoopTest {
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchedule() throws Exception {
|
||||
IOUringEventLoopGroup group = new IOUringEventLoopGroup(1);
|
||||
try {
|
||||
EventLoop loop = group.next();
|
||||
loop.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS).sync();
|
||||
} finally {
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -18,8 +18,6 @@ package io.netty.channel.uring;
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.SocketConnectionAttemptTest;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ -28,11 +26,4 @@ public class IOUringSocketConnectionAttemptTest extends SocketConnectionAttemptT
|
||||
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
|
||||
return IOUringSocketTestPermutation.INSTANCE.clientSocket();
|
||||
}
|
||||
|
||||
@Ignore("FIX ME")
|
||||
@Test
|
||||
@Override
|
||||
public void testConnectTimeout() throws Throwable {
|
||||
super.testConnectTimeout();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user