Merge pull request #20 from normanmaurer/timeout_fix
Correctly implement IOUringSubmissionQueue.addTimeout(...) and ensure…
This commit is contained in:
commit
b4e7f046d5
@ -117,7 +117,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
|||||||
submissionQueue.addPollIn(eventfd.intValue());
|
submissionQueue.addPollIn(eventfd.intValue());
|
||||||
submissionQueue.submit();
|
submissionQueue.submit();
|
||||||
|
|
||||||
for (; ; ) {
|
for (;;) {
|
||||||
logger.trace("Run IOUringEventLoop {}", this.toString());
|
logger.trace("Run IOUringEventLoop {}", this.toString());
|
||||||
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
|
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
|
||||||
if (curDeadlineNanos == -1L) {
|
if (curDeadlineNanos == -1L) {
|
||||||
@ -152,9 +152,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
|||||||
|
|
||||||
completionQueue.process(this);
|
completionQueue.process(this);
|
||||||
|
|
||||||
if (hasTasks()) {
|
// Always call runAllTasks() as it will also fetch the scheduled tasks that are ready.
|
||||||
runAllTasks();
|
runAllTasks();
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (isShuttingDown()) {
|
if (isShuttingDown()) {
|
||||||
@ -169,6 +168,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean handle(int fd, int res, long flags, int op, int pollMask) {
|
public boolean handle(int fd, int res, long flags, int op, int pollMask) {
|
||||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||||
|
@ -22,6 +22,9 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
|||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import static java.lang.Math.max;
|
||||||
|
import static java.lang.Math.min;
|
||||||
|
|
||||||
final class IOUringSubmissionQueue {
|
final class IOUringSubmissionQueue {
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringSubmissionQueue.class);
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringSubmissionQueue.class);
|
||||||
|
|
||||||
@ -329,7 +332,6 @@ final class IOUringSubmissionQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setTimeout(long timeoutNanoSeconds) {
|
private void setTimeout(long timeoutNanoSeconds) {
|
||||||
long seconds, nanoSeconds;
|
long seconds, nanoSeconds;
|
||||||
|
|
||||||
@ -338,8 +340,8 @@ final class IOUringSubmissionQueue {
|
|||||||
seconds = 0;
|
seconds = 0;
|
||||||
nanoSeconds = 0;
|
nanoSeconds = 0;
|
||||||
} else {
|
} else {
|
||||||
seconds = timeoutNanoSeconds / 1000000000L;
|
seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
|
||||||
nanoSeconds = timeoutNanoSeconds % 1000;
|
nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
|
PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
|
||||||
|
@ -18,6 +18,8 @@ package io.netty.channel.uring;
|
|||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class IOUringEventLoopTest {
|
public class IOUringEventLoopTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -50,4 +52,21 @@ public class IOUringEventLoopTest {
|
|||||||
group.shutdownGracefully();
|
group.shutdownGracefully();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedule() throws Exception {
|
||||||
|
IOUringEventLoopGroup group = new IOUringEventLoopGroup(1);
|
||||||
|
try {
|
||||||
|
EventLoop loop = group.next();
|
||||||
|
loop.schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}, 1, TimeUnit.SECONDS).sync();
|
||||||
|
} finally {
|
||||||
|
group.shutdownGracefully();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,8 +18,6 @@ package io.netty.channel.uring;
|
|||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||||
import io.netty.testsuite.transport.socket.SocketConnectionAttemptTest;
|
import io.netty.testsuite.transport.socket.SocketConnectionAttemptTest;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -28,11 +26,4 @@ public class IOUringSocketConnectionAttemptTest extends SocketConnectionAttemptT
|
|||||||
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
|
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
|
||||||
return IOUringSocketTestPermutation.INSTANCE.clientSocket();
|
return IOUringSocketTestPermutation.INSTANCE.clientSocket();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("FIX ME")
|
|
||||||
@Test
|
|
||||||
@Override
|
|
||||||
public void testConnectTimeout() throws Throwable {
|
|
||||||
super.testConnectTimeout();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user