Correctly calculate timeout for io_uring

Motivation:

We need to use deadlineToDelayNanos(...) to calculate the timeout for io_uring as otherwise the timeout will be scheduled at the wrong time in the future

Modifications:

Make use of deadlineToDelayNanos(...)

Result:

Correctly schedule timeou
This commit is contained in:
Norman Maurer 2020-08-25 14:27:33 +02:00
parent b62668d1d0
commit 16530998a3

View File

@ -62,7 +62,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
private long prevDeadlineNanos = NONE; private long prevDeadlineNanos = NONE;
private boolean pendingWakeup; private boolean pendingWakeup;
//private final FileDescriptor eventFd;
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp); super(parent, executor, addTaskWakesUp);
@ -146,10 +145,12 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
event.setId(eventId); event.setId(eventId);
event.setOp(EventType.TIMEOUT); event.setOp(EventType.TIMEOUT);
addNewEvent(event); addNewEvent(event);
submissionQueue.addTimeout(curDeadlineNanos, eventId); submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), eventId);
submissionQueue.submit();
} }
// Block if there is nothing to process. // Block if there is nothing to process.
logger.debug("ioUringWaitCqe {}", this.toString());
ioUringCqe = completionQueue.ioUringWaitCqe(); ioUringCqe = completionQueue.ioUringWaitCqe();
} finally { } finally {
@ -159,22 +160,22 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
} }
} else { } else {
// Just poll as there are tasks to process so we don't want to block. // Just poll as there are tasks to process so we don't want to block.
logger.debug("poll {}", this.toString());
ioUringCqe = completionQueue.poll(); ioUringCqe = completionQueue.poll();
} }
logger.info("ioUringWaitCqe {}", this.toString());
while (ioUringCqe != null) { while (ioUringCqe != null) {
final Event event = events.get(ioUringCqe.getEventId()); final Event event = events.get(ioUringCqe.getEventId());
if (event != null) { if (event != null) {
logger.info("EventType Incoming: " + event.getOp().name()); logger.debug("EventType Incoming: " + event.getOp().name());
processEvent(ioUringCqe.getRes(), event); processEvent(ioUringCqe.getRes(), event);
} }
// Process one entry after the other until there are none left. This will ensure we process // Process one entry after the other until there are none left. This will ensure we process
// all of these before we try to consume tasks. // all of these before we try to consume tasks.
ioUringCqe = completionQueue.poll(); ioUringCqe = completionQueue.poll();
logger.info("ioUringWaitCqe {}", this.toString()); logger.debug("poll {}", this.toString());
} }
if (hasTasks()) { if (hasTasks()) {
@ -314,6 +315,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp()); submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp());
// Submit so its picked up // Submit so its picked up
submissionQueue.submit(); submissionQueue.submit();
break;
case POLL_LINK: case POLL_LINK:
//Todo error handling error //Todo error handling error
logger.info("POLL_LINK Res: {}", res); logger.info("POLL_LINK Res: {}", res);