Further reduce io_uring syscalls (#10542)

Motivation

IOUringEventLoop can be streamlined to further reduce io_uring_enter
calls

Modification

- Don't prepare to block-wait until all available work is exhausted
- Combine submission with GETEVENTS

Result

Hopefully faster
This commit is contained in:
Nick Hill 2020-09-13 06:46:39 -07:00 committed by GitHub
parent f674d15865
commit 907a71c930
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 186 additions and 165 deletions

View File

@ -163,6 +163,10 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
@Override
protected abstract AbstractUringUnsafe newUnsafe();
AbstractUringUnsafe ioUringUnsafe() {
return (AbstractUringUnsafe) unsafe();
}
@Override
protected boolean isCompatible(final EventLoop loop) {
return loop instanceof IOUringEventLoop;
@ -255,9 +259,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
// Channel/ChannelHandlerContext.read() was called
@Override
protected void doBeginRead() {
final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe();
if ((ioState & POLL_IN_SCHEDULED) == 0) {
unsafe.schedulePollIn();
ioUringUnsafe().schedulePollIn();
}
}

View File

@ -40,6 +40,9 @@ final class IOUringCompletionQueue {
private final long ringAddress;
private final int ringFd;
private final int ringMask;
private int ringHead;
IOUringCompletionQueue(long kHeadAddress, long kTailAddress, long kringMaskAddress, long kringEntries,
long kOverflowAddress, long completionQueueArrayAddress, int ringSize, long ringAddress, int ringFd) {
this.kHeadAddress = kHeadAddress;
@ -51,44 +54,42 @@ final class IOUringCompletionQueue {
this.ringSize = ringSize;
this.ringAddress = ringAddress;
this.ringFd = ringFd;
this.ringMask = PlatformDependent.getIntVolatile(kringMaskAddress);
this.ringHead = PlatformDependent.getIntVolatile(kHeadAddress);
}
public boolean hasCompletions() {
return ringHead != PlatformDependent.getIntVolatile(kTailAddress);
}
public int process(IOUringCompletionQueueCallback callback) {
int tail = PlatformDependent.getIntVolatile(kTailAddress);
int i = 0;
for (;;) {
long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) {
long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress));
long cqe = index * CQE_SIZE + completionQueueArrayAddress;
while (ringHead != tail) {
long cqeAddress = completionQueueArrayAddress + (ringHead & ringMask) * CQE_SIZE;
long udata = PlatformDependent.getLong(cqe + CQE_USER_DATA_FIELD);
int res = PlatformDependent.getInt(cqe + CQE_RES_FIELD);
long flags = toUnsignedLong(PlatformDependent.getInt(cqe + CQE_FLAGS_FIELD));
long udata = PlatformDependent.getLong(cqeAddress + CQE_USER_DATA_FIELD);
int res = PlatformDependent.getInt(cqeAddress + CQE_RES_FIELD);
int flags = PlatformDependent.getInt(cqeAddress + CQE_FLAGS_FIELD);
//Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
PlatformDependent.putIntOrdered(kHeadAddress, (int) (head + 1));
//Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
ringHead++;
PlatformDependent.putIntOrdered(kHeadAddress, ringHead);
int fd = (int) (udata >> 32);
int opMask = (int) udata;
short op = (short) (opMask >> 16);
short mask = (short) opMask;
int fd = (int) (udata >>> 32);
int opMask = (int) (udata & 0xFFFFFFFFL);
int op = opMask >>> 16;
int mask = opMask & 0xffff;
i++;
if (!callback.handle(fd, res, flags, op, mask)) {
break;
}
} else {
if (i == 0) {
return -1;
}
return i;
}
i++;
callback.handle(fd, res, flags, op, mask);
}
return i;
}
interface IOUringCompletionQueueCallback {
boolean handle(int fd, int res, long flags, int op, int mask);
void handle(int fd, int res, int flags, int op, int mask);
}
public boolean ioUringWaitCqe() {

View File

@ -136,149 +136,154 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
// Lets add the eventfd related events before starting to do any real work.
addEventFdRead(submissionQueue);
submissionQueue.submit();
for (;;) {
logger.trace("Run IOUringEventLoop {}", this.toString());
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
logger.trace("Run IOUringEventLoop {}", this);
// Only submit a timeout if there are no tasks to process and do a blocking operation
// on the completionQueue.
if (!hasTasks()) {
// Prepare to block wait
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
// Only submit a timeout if there are no tasks to process and do a blocking operation
// on the completionQueue.
try {
if (curDeadlineNanos != prevDeadlineNanos) {
prevDeadlineNanos = curDeadlineNanos;
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos));
submissionQueue.submit();
}
if (!hasTasks()) {
if (curDeadlineNanos != prevDeadlineNanos) {
prevDeadlineNanos = curDeadlineNanos;
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos));
}
// Check there were any completion events to process
if (completionQueue.process(this) == -1) {
// Block if there is nothing to process after this try again to call process(....)
logger.trace("ioUringWaitCqe {}", this.toString());
completionQueue.ioUringWaitCqe();
// Check there were any completion events to process
if (!completionQueue.hasCompletions()) {
// Block if there is nothing to process after this try again to call process(....)
logger.trace("submitAndWait {}", this);
submissionQueue.submitAndWait();
}
}
} catch (Throwable t) {
//Todo handle exception
} finally {
if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
pendingWakeup = true;
}
}
}
completionQueue.process(this);
// Always call runAllTasks() as it will also fetch the scheduled tasks that are ready.
runAllTasks();
submissionQueue.submit();
try {
if (isShuttingDown()) {
closeAll();
submissionQueue.submit();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.info("Exception error: {}", t);
handleLoopException(t);
}
// Avoid blocking for as long as possible - loop until available work exhausted
boolean maybeMoreWork = true;
do {
try {
// CQE processing can produce tasks, and new CQEs could arrive while
// processing tasks. So run both on every iteration and break when
// they both report that nothing was done (| means always run both).
maybeMoreWork = completionQueue.process(this) != 0 | runAllTasks();
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
if (!maybeMoreWork) {
maybeMoreWork = hasTasks() || completionQueue.hasCompletions();
}
}
} catch (Throwable t) {
handleLoopException(t);
}
} while (maybeMoreWork);
}
}
/**
* Visible only for testing!
*/
void handleLoopException(Throwable t) {
logger.warn("Unexpected exception in the io_uring event loop", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
@Override
public boolean handle(int fd, int res, long flags, int op, int pollMask) {
final AbstractIOUringChannel channel;
if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT) {
if (eventfd.intValue() == fd) {
channel = null;
if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
pendingWakeup = false;
addEventFdRead(ringBuffer.getIoUringSubmissionQueue());
}
} else {
channel = handleRead(fd, res);
public void handle(int fd, int res, int flags, int op, int pollMask) {
if (op == Native.IORING_OP_READ && eventfd.intValue() == fd) {
if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
pendingWakeup = false;
addEventFdRead(ringBuffer.getIoUringSubmissionQueue());
}
} else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_WRITE) {
channel = handleWrite(fd, res);
} else if (op == Native.IORING_OP_POLL_ADD) {
channel = handlePollAdd(fd, res, pollMask);
} else if (op == Native.IORING_OP_POLL_REMOVE) {
if (res == Errors.ERRNO_ENOENT_NEGATIVE) {
logger.trace("IORING_POLL_REMOVE not successful");
} else if (res == 0) {
logger.trace("IORING_POLL_REMOVE successful");
}
channel = channels.get(fd);
if (channel != null && !channel.ioScheduled()) {
// We cancelled the POLL ops which means we are done and should remove the mapping.
channels.remove(fd);
}
} else if (op == Native.IORING_OP_CONNECT) {
channel = handleConnect(fd, res);
} else if (op == Native.IORING_OP_TIMEOUT) {
if (res == Native.ERRNO_ETIME_NEGATIVE) {
prevDeadlineNanos = NONE;
}
channel = null;
} else {
return true;
// Remaining events should be channel-specific
final AbstractIOUringChannel channel = channels.get(fd);
if (channel == null) {
return;
}
if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT) {
handleRead(channel, res);
} else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_WRITE) {
handleWrite(channel, res);
} else if (op == Native.IORING_OP_POLL_ADD) {
handlePollAdd(channel, res, pollMask);
} else if (op == Native.IORING_OP_POLL_REMOVE) {
if (res == Errors.ERRNO_ENOENT_NEGATIVE) {
logger.trace("IORING_POLL_REMOVE not successful");
} else if (res == 0) {
logger.trace("IORING_POLL_REMOVE successful");
}
if (!channel.ioScheduled()) {
// We cancelled the POLL ops which means we are done and should remove the mapping.
channels.remove(fd);
return;
}
} else if (op == Native.IORING_OP_CONNECT) {
handleConnect(channel, res);
}
channel.ioUringUnsafe().processDelayedClose();
}
if (channel != null) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).processDelayedClose();
}
return true;
}
private AbstractIOUringChannel handleRead(int fd, int res) {
AbstractIOUringChannel readChannel = channels.get(fd);
if (readChannel != null) {
((AbstractIOUringChannel.AbstractUringUnsafe) readChannel.unsafe()).readComplete(res);
}
return readChannel;
private void handleRead(AbstractIOUringChannel channel, int res) {
channel.ioUringUnsafe().readComplete(res);
}
private AbstractIOUringChannel handleWrite(int fd, int res) {
AbstractIOUringChannel writeChannel = channels.get(fd);
if (writeChannel != null) {
((AbstractIOUringChannel.AbstractUringUnsafe) writeChannel.unsafe()).writeComplete(res);
}
return writeChannel;
private void handleWrite(AbstractIOUringChannel channel, int res) {
channel.ioUringUnsafe().writeComplete(res);
}
private AbstractIOUringChannel handlePollAdd(int fd, int res, int pollMask) {
AbstractIOUringChannel channel = channels.get(fd);
if (channel != null) {
if ((pollMask & Native.POLLOUT) != 0) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollOut(res);
}
if ((pollMask & Native.POLLIN) != 0) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollIn(res);
}
if ((pollMask & Native.POLLRDHUP) != 0) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollRdHup(res);
}
private void handlePollAdd(AbstractIOUringChannel channel, int res, int pollMask) {
if ((pollMask & Native.POLLOUT) != 0) {
channel.ioUringUnsafe().pollOut(res);
}
if ((pollMask & Native.POLLIN) != 0) {
channel.ioUringUnsafe().pollIn(res);
}
if ((pollMask & Native.POLLRDHUP) != 0) {
channel.ioUringUnsafe().pollRdHup(res);
}
return channel;
}
private void addEventFdRead(IOUringSubmissionQueue submissionQueue) {
submissionQueue.addRead(eventfd.intValue(), eventfdReadBuf, 0, 8);
}
private AbstractIOUringChannel handleConnect(int fd, int res) {
AbstractIOUringChannel channel = channels.get(fd);
if (channel != null) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).connectComplete(res);
}
return channel;
private void handleConnect(AbstractIOUringChannel channel, int res) {
channel.ioUringUnsafe().connectComplete(res);
}
@Override

View File

@ -101,15 +101,18 @@ final class IOUringSubmissionQueue {
}
private boolean enqueueSqe(int op, int rwFlags, int fd, long bufferAddress, int length, long offset) {
boolean submitted = false;
int pending = tail - head;
if (pending == ringEntries) {
submit();
submitted = true;
boolean submit = pending == ringEntries;
if (submit) {
int submitted = submit();
if (submitted == 0) {
// We have a problem, could not submit to make more room in the ring
throw new RuntimeException("SQ ring full and no submissions accepted");
}
}
long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
setData(sqe, op, rwFlags, fd, bufferAddress, length, offset);
return submitted;
return submit;
}
private void setData(long sqe, int op, int rwFlags, int fd, long bufferAddress, int length, long offset) {
@ -185,20 +188,36 @@ final class IOUringSubmissionQueue {
return enqueueSqe(Native.IORING_OP_CLOSE, 0, fd, 0, 0, 0);
}
public void submit() {
public int submit() {
int submit = tail - head;
return submit > 0 ? submit(submit, 0, 0) : 0;
}
public int submitAndWait() {
int submit = tail - head;
if (submit > 0) {
PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier
int ret = Native.ioUringEnter(ringFd, submit, 0, 0);
head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier
if (ret != submit) {
if (ret < 0) {
throw new RuntimeException("ioUringEnter syscall");
}
logger.warn("Not all submissions succeeded");
}
submissionCallback.run();
return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
}
assert submit == 0;
int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
if (ret < 0) {
throw new RuntimeException("ioUringEnter syscall returned " + ret);
}
return ret; // should be 0
}
private int submit(int toSubmit, int minComplete, int flags) {
PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier
int ret = Native.ioUringEnter(ringFd, toSubmit, minComplete, flags);
head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier
if (ret != toSubmit) {
if (ret < 0) {
throw new RuntimeException("ioUringEnter syscall returned " + ret);
}
logger.warn("Not all submissions succeeded");
}
submissionCallback.run();
return ret;
}
private void setTimeout(long timeoutNanoSeconds) {
@ -231,6 +250,8 @@ final class IOUringSubmissionQueue {
PlatformDependent.freeMemory(timeoutMemoryAddress);
}
// The getters below are called from JNI code
public int getRingEntries() {
return this.ringEntries;
}
@ -254,5 +275,4 @@ final class IOUringSubmissionQueue {
public long getRingAddress() {
return this.ringAddress;
}
}

View File

@ -61,10 +61,9 @@ public class NativeTest {
assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
public void handle(int fd, int res, int flags, int op, int mask) {
assertEquals(inputString.length(), res);
writeEventByteBuf.release();
return true;
}
}));
@ -76,10 +75,9 @@ public class NativeTest {
assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
public void handle(int fd, int res, int flags, int op, int mask) {
assertEquals(inputString.length(), res);
readEventByteBuf.writerIndex(res);
return true;
}
}));
byte[] dataRead = new byte[inputString.length()];
@ -109,9 +107,8 @@ public class NativeTest {
try {
completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
public void handle(int fd, int res, int flags, int op, int mask) {
assertEquals(-62, res);
return true;
}
});
} catch (Exception e) {
@ -158,9 +155,8 @@ public class NativeTest {
assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
public void handle(int fd, int res, int flags, int op, int mask) {
assertEquals(1, res);
return true;
}
}));
try {
@ -190,9 +186,8 @@ public class NativeTest {
assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
public void handle(int fd, int res, int flags, int op, int mask) {
assertEquals(1, res);
return true;
}
}));
}
@ -242,7 +237,7 @@ public class NativeTest {
private final IOUringCompletionQueue.IOUringCompletionQueueCallback verifyCallback =
new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
public void handle(int fd, int res, int flags, int op, int mask) {
if (op == Native.IORING_OP_POLL_ADD) {
assertEquals(Native.ERRNO_ECANCELED_NEGATIVE, res);
} else if (op == Native.IORING_OP_POLL_REMOVE) {
@ -250,7 +245,6 @@ public class NativeTest {
} else {
fail("op " + op);
}
return false;
}
};
@ -258,9 +252,7 @@ public class NativeTest {
public void run() {
try {
assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, completionQueue.process(verifyCallback));
assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, completionQueue.process(verifyCallback));
assertEquals(2, completionQueue.process(verifyCallback));
} catch (AssertionError error) {
errorRef.set(error);
}