Fix bug in IOUringEventLoop which may caused eventfd_write to not unblock and make processing more efficient
Motivation: There was a bug in the implemention so we missed to submit what was in the submission queue. This could lead to a deadlock. Beside this we should also process all events that we can poll without blocking and only after that process tasks. This will ensure we drain the ringbuffers in a timely manner Modifications: - Add missing submit() call - Rename peek() to poll() as we consume the data so peek() is missleading - Process all events that can be processed without blocking Result: Fix a bug, clarify and better performance
This commit is contained in:
parent
8435c0ce1f
commit
b62668d1d0
@ -55,7 +55,7 @@ final class IOUringCompletionQueue {
|
|||||||
this.ringFd = ringFd;
|
this.ringFd = ringFd;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IOUringCqe peek() {
|
public IOUringCqe poll() {
|
||||||
long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
|
long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
|
||||||
|
|
||||||
if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) {
|
if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) {
|
||||||
@ -75,7 +75,7 @@ final class IOUringCompletionQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public IOUringCqe ioUringWaitCqe() {
|
public IOUringCqe ioUringWaitCqe() {
|
||||||
IOUringCqe ioUringCqe = peek();
|
IOUringCqe ioUringCqe = poll();
|
||||||
|
|
||||||
if (ioUringCqe != null) {
|
if (ioUringCqe != null) {
|
||||||
return ioUringCqe;
|
return ioUringCqe;
|
||||||
@ -87,7 +87,7 @@ final class IOUringCompletionQueue {
|
|||||||
//Todo throw exception!
|
//Todo throw exception!
|
||||||
return null;
|
return null;
|
||||||
} else if (ret == 0) {
|
} else if (ret == 0) {
|
||||||
ioUringCqe = peek();
|
ioUringCqe = poll();
|
||||||
|
|
||||||
if (ioUringCqe != null) {
|
if (ioUringCqe != null) {
|
||||||
return ioUringCqe;
|
return ioUringCqe;
|
||||||
|
@ -133,7 +133,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
|||||||
curDeadlineNanos = NONE; // nothing on the calendar
|
curDeadlineNanos = NONE; // nothing on the calendar
|
||||||
}
|
}
|
||||||
nextWakeupNanos.set(curDeadlineNanos);
|
nextWakeupNanos.set(curDeadlineNanos);
|
||||||
|
IOUringCqe ioUringCqe;
|
||||||
|
|
||||||
|
// Only submit a timeout if there are no tasks to process and do a blocking operation
|
||||||
|
// on the completionQueue.
|
||||||
if (!hasTasks()) {
|
if (!hasTasks()) {
|
||||||
try {
|
try {
|
||||||
if (curDeadlineNanos != prevDeadlineNanos) {
|
if (curDeadlineNanos != prevDeadlineNanos) {
|
||||||
@ -145,22 +148,33 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
|||||||
addNewEvent(event);
|
addNewEvent(event);
|
||||||
submissionQueue.addTimeout(curDeadlineNanos, eventId);
|
submissionQueue.addTimeout(curDeadlineNanos, eventId);
|
||||||
}
|
}
|
||||||
final IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe();
|
|
||||||
logger.info("ioUringWaitCqe {}", this.toString());
|
|
||||||
if (ioUringCqe != null) {
|
|
||||||
final Event event = events.get(ioUringCqe.getEventId());
|
|
||||||
|
|
||||||
if (event != null) {
|
// Block if there is nothing to process.
|
||||||
logger.info("EventType Incoming: " + event.getOp().name());
|
ioUringCqe = completionQueue.ioUringWaitCqe();
|
||||||
processEvent(ioUringCqe.getRes(), event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
|
|
||||||
if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
|
if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
|
||||||
pendingWakeup = true;
|
pendingWakeup = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Just poll as there are tasks to process so we don't want to block.
|
||||||
|
ioUringCqe = completionQueue.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("ioUringWaitCqe {}", this.toString());
|
||||||
|
while (ioUringCqe != null) {
|
||||||
|
final Event event = events.get(ioUringCqe.getEventId());
|
||||||
|
|
||||||
|
if (event != null) {
|
||||||
|
logger.info("EventType Incoming: " + event.getOp().name());
|
||||||
|
processEvent(ioUringCqe.getRes(), event);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process one entry after the other until there are none left. This will ensure we process
|
||||||
|
// all of these before we try to consume tasks.
|
||||||
|
ioUringCqe = completionQueue.poll();
|
||||||
|
logger.info("ioUringWaitCqe {}", this.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasTasks()) {
|
if (hasTasks()) {
|
||||||
@ -298,6 +312,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
|||||||
event.setOp(EventType.POLL_EVENTFD);
|
event.setOp(EventType.POLL_EVENTFD);
|
||||||
addNewEvent(event);
|
addNewEvent(event);
|
||||||
submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp());
|
submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp());
|
||||||
|
// Submit so its picked up
|
||||||
|
submissionQueue.submit();
|
||||||
case POLL_LINK:
|
case POLL_LINK:
|
||||||
//Todo error handling error
|
//Todo error handling error
|
||||||
logger.info("POLL_LINK Res: {}", res);
|
logger.info("POLL_LINK Res: {}", res);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user