From b62668d1d0dd525627936f9b039a06ad2c022ee0 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 25 Aug 2020 12:48:01 +0200 Subject: [PATCH] Fix bug in IOUringEventLoop which may caused eventfd_write to not unblock and make processing more efficient Motivation: There was a bug in the implemention so we missed to submit what was in the submission queue. This could lead to a deadlock. Beside this we should also process all events that we can poll without blocking and only after that process tasks. This will ensure we drain the ringbuffers in a timely manner Modifications: - Add missing submit() call - Rename peek() to poll() as we consume the data so peek() is missleading - Process all events that can be processed without blocking Result: Fix a bug, clarify and better performance --- .../channel/uring/IOUringCompletionQueue.java | 6 ++-- .../netty/channel/uring/IOUringEventLoop.java | 34 ++++++++++++++----- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java index 3b3f1f3ab1..4d512280d8 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java @@ -55,7 +55,7 @@ final class IOUringCompletionQueue { this.ringFd = ringFd; } - public IOUringCqe peek() { + public IOUringCqe poll() { long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) { @@ -75,7 +75,7 @@ final class IOUringCompletionQueue { } public IOUringCqe ioUringWaitCqe() { - IOUringCqe ioUringCqe = peek(); + IOUringCqe ioUringCqe = poll(); if (ioUringCqe != null) { return ioUringCqe; @@ -87,7 +87,7 @@ final class IOUringCompletionQueue { //Todo throw exception! return null; } else if (ret == 0) { - ioUringCqe = peek(); + ioUringCqe = poll(); if (ioUringCqe != null) { return ioUringCqe; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index bebb28de8d..aeaf553b09 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -133,7 +133,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); + IOUringCqe ioUringCqe; + // Only submit a timeout if there are no tasks to process and do a blocking operation + // on the completionQueue. if (!hasTasks()) { try { if (curDeadlineNanos != prevDeadlineNanos) { @@ -145,22 +148,33 @@ final class IOUringEventLoop extends SingleThreadEventLoop { addNewEvent(event); submissionQueue.addTimeout(curDeadlineNanos, eventId); } - final IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe(); - logger.info("ioUringWaitCqe {}", this.toString()); - if (ioUringCqe != null) { - final Event event = events.get(ioUringCqe.getEventId()); - if (event != null) { - logger.info("EventType Incoming: " + event.getOp().name()); - processEvent(ioUringCqe.getRes(), event); - } - } + // Block if there is nothing to process. + ioUringCqe = completionQueue.ioUringWaitCqe(); } finally { if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) { pendingWakeup = true; } } + } else { + // Just poll as there are tasks to process so we don't want to block. + ioUringCqe = completionQueue.poll(); + } + + logger.info("ioUringWaitCqe {}", this.toString()); + while (ioUringCqe != null) { + final Event event = events.get(ioUringCqe.getEventId()); + + if (event != null) { + logger.info("EventType Incoming: " + event.getOp().name()); + processEvent(ioUringCqe.getRes(), event); + } + + // Process one entry after the other until there are none left. This will ensure we process + // all of these before we try to consume tasks. + ioUringCqe = completionQueue.poll(); + logger.info("ioUringWaitCqe {}", this.toString()); } if (hasTasks()) { @@ -298,6 +312,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop { event.setOp(EventType.POLL_EVENTFD); addNewEvent(event); submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp()); + // Submit so its picked up + submissionQueue.submit(); case POLL_LINK: //Todo error handling error logger.info("POLL_LINK Res: {}", res);