From 907a71c9305b5b0248c7c22fec14c76881fb7a96 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Sun, 13 Sep 2020 06:46:39 -0700 Subject: [PATCH] Further reduce io_uring syscalls (#10542) Motivation IOUringEventLoop can be streamlined to further reduce io_uring_enter calls Modification - Don't prepare to block-wait until all available work is exhausted - Combine submission with GETEVENTS Result Hopefully faster --- .../channel/uring/AbstractIOUringChannel.java | 7 +- .../channel/uring/IOUringCompletionQueue.java | 51 ++-- .../netty/channel/uring/IOUringEventLoop.java | 217 +++++++++--------- .../channel/uring/IOUringSubmissionQueue.java | 54 +++-- .../io/netty/channel/uring/NativeTest.java | 22 +- 5 files changed, 186 insertions(+), 165 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 3b0ed2d462..d23e2c5bd6 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -163,6 +163,10 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha @Override protected abstract AbstractUringUnsafe newUnsafe(); + AbstractUringUnsafe ioUringUnsafe() { + return (AbstractUringUnsafe) unsafe(); + } + @Override protected boolean isCompatible(final EventLoop loop) { return loop instanceof IOUringEventLoop; @@ -255,9 +259,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha // Channel/ChannelHandlerContext.read() was called @Override protected void doBeginRead() { - final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe(); if ((ioState & POLL_IN_SCHEDULED) == 0) { - unsafe.schedulePollIn(); + ioUringUnsafe().schedulePollIn(); } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java index adc3bbfc01..aa9bafb728 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java @@ -40,6 +40,9 @@ final class IOUringCompletionQueue { private final long ringAddress; private final int ringFd; + private final int ringMask; + private int ringHead; + IOUringCompletionQueue(long kHeadAddress, long kTailAddress, long kringMaskAddress, long kringEntries, long kOverflowAddress, long completionQueueArrayAddress, int ringSize, long ringAddress, int ringFd) { this.kHeadAddress = kHeadAddress; @@ -51,44 +54,42 @@ final class IOUringCompletionQueue { this.ringSize = ringSize; this.ringAddress = ringAddress; this.ringFd = ringFd; + + this.ringMask = PlatformDependent.getIntVolatile(kringMaskAddress); + this.ringHead = PlatformDependent.getIntVolatile(kHeadAddress); + } + + public boolean hasCompletions() { + return ringHead != PlatformDependent.getIntVolatile(kTailAddress); } public int process(IOUringCompletionQueueCallback callback) { + int tail = PlatformDependent.getIntVolatile(kTailAddress); int i = 0; - for (;;) { - long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); - if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) { - long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress)); - long cqe = index * CQE_SIZE + completionQueueArrayAddress; + while (ringHead != tail) { + long cqeAddress = completionQueueArrayAddress + (ringHead & ringMask) * CQE_SIZE; - long udata = PlatformDependent.getLong(cqe + CQE_USER_DATA_FIELD); - int res = PlatformDependent.getInt(cqe + CQE_RES_FIELD); - long flags = toUnsignedLong(PlatformDependent.getInt(cqe + CQE_FLAGS_FIELD)); + long udata = PlatformDependent.getLong(cqeAddress + CQE_USER_DATA_FIELD); + int res = PlatformDependent.getInt(cqeAddress + CQE_RES_FIELD); + int flags = PlatformDependent.getInt(cqeAddress + CQE_FLAGS_FIELD); - //Ensure that the kernel only sees the new value of the head index after the CQEs have been read. - PlatformDependent.putIntOrdered(kHeadAddress, (int) (head + 1)); + //Ensure that the kernel only sees the new value of the head index after the CQEs have been read. + ringHead++; + PlatformDependent.putIntOrdered(kHeadAddress, ringHead); - int fd = (int) (udata >> 32); - int opMask = (int) udata; - short op = (short) (opMask >> 16); - short mask = (short) opMask; + int fd = (int) (udata >>> 32); + int opMask = (int) (udata & 0xFFFFFFFFL); + int op = opMask >>> 16; + int mask = opMask & 0xffff; - i++; - if (!callback.handle(fd, res, flags, op, mask)) { - break; - } - } else { - if (i == 0) { - return -1; - } - return i; - } + i++; + callback.handle(fd, res, flags, op, mask); } return i; } interface IOUringCompletionQueueCallback { - boolean handle(int fd, int res, long flags, int op, int mask); + void handle(int fd, int res, int flags, int op, int mask); } public boolean ioUringWaitCqe() { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 415d6adc3f..1528c60943 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -136,149 +136,154 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements // Lets add the eventfd related events before starting to do any real work. addEventFdRead(submissionQueue); - submissionQueue.submit(); for (;;) { - logger.trace("Run IOUringEventLoop {}", this.toString()); - long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); - if (curDeadlineNanos == -1L) { - curDeadlineNanos = NONE; // nothing on the calendar - } - nextWakeupNanos.set(curDeadlineNanos); + try { + logger.trace("Run IOUringEventLoop {}", this); - // Only submit a timeout if there are no tasks to process and do a blocking operation - // on the completionQueue. - if (!hasTasks()) { + // Prepare to block wait + long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); + if (curDeadlineNanos == -1L) { + curDeadlineNanos = NONE; // nothing on the calendar + } + nextWakeupNanos.set(curDeadlineNanos); + + // Only submit a timeout if there are no tasks to process and do a blocking operation + // on the completionQueue. try { - if (curDeadlineNanos != prevDeadlineNanos) { - prevDeadlineNanos = curDeadlineNanos; - submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos)); - submissionQueue.submit(); - } + if (!hasTasks()) { + if (curDeadlineNanos != prevDeadlineNanos) { + prevDeadlineNanos = curDeadlineNanos; + submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos)); + } - // Check there were any completion events to process - if (completionQueue.process(this) == -1) { - // Block if there is nothing to process after this try again to call process(....) - logger.trace("ioUringWaitCqe {}", this.toString()); - completionQueue.ioUringWaitCqe(); + // Check there were any completion events to process + if (!completionQueue.hasCompletions()) { + // Block if there is nothing to process after this try again to call process(....) + logger.trace("submitAndWait {}", this); + submissionQueue.submitAndWait(); + } } - } catch (Throwable t) { - //Todo handle exception } finally { if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) { pendingWakeup = true; } } - } - - completionQueue.process(this); - - // Always call runAllTasks() as it will also fetch the scheduled tasks that are ready. - runAllTasks(); - - submissionQueue.submit(); - try { - if (isShuttingDown()) { - closeAll(); - submissionQueue.submit(); - - if (confirmShutdown()) { - break; - } - } } catch (Throwable t) { - logger.info("Exception error: {}", t); + handleLoopException(t); } + + // Avoid blocking for as long as possible - loop until available work exhausted + boolean maybeMoreWork = true; + do { + try { + // CQE processing can produce tasks, and new CQEs could arrive while + // processing tasks. So run both on every iteration and break when + // they both report that nothing was done (| means always run both). + maybeMoreWork = completionQueue.process(this) != 0 | runAllTasks(); + } catch (Throwable t) { + handleLoopException(t); + } + // Always handle shutdown even if the loop processing threw an exception + try { + if (isShuttingDown()) { + closeAll(); + if (confirmShutdown()) { + return; + } + if (!maybeMoreWork) { + maybeMoreWork = hasTasks() || completionQueue.hasCompletions(); + } + } + } catch (Throwable t) { + handleLoopException(t); + } + } while (maybeMoreWork); + } + } + + /** + * Visible only for testing! + */ + void handleLoopException(Throwable t) { + logger.warn("Unexpected exception in the io_uring event loop", t); + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. } } @Override - public boolean handle(int fd, int res, long flags, int op, int pollMask) { - final AbstractIOUringChannel channel; - if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT) { - if (eventfd.intValue() == fd) { - channel = null; - if (res != Native.ERRNO_ECANCELED_NEGATIVE) { - pendingWakeup = false; - addEventFdRead(ringBuffer.getIoUringSubmissionQueue()); - } - } else { - channel = handleRead(fd, res); + public void handle(int fd, int res, int flags, int op, int pollMask) { + if (op == Native.IORING_OP_READ && eventfd.intValue() == fd) { + if (res != Native.ERRNO_ECANCELED_NEGATIVE) { + pendingWakeup = false; + addEventFdRead(ringBuffer.getIoUringSubmissionQueue()); } - } else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_WRITE) { - channel = handleWrite(fd, res); - } else if (op == Native.IORING_OP_POLL_ADD) { - channel = handlePollAdd(fd, res, pollMask); - } else if (op == Native.IORING_OP_POLL_REMOVE) { - if (res == Errors.ERRNO_ENOENT_NEGATIVE) { - logger.trace("IORING_POLL_REMOVE not successful"); - } else if (res == 0) { - logger.trace("IORING_POLL_REMOVE successful"); - } - - channel = channels.get(fd); - if (channel != null && !channel.ioScheduled()) { - // We cancelled the POLL ops which means we are done and should remove the mapping. - channels.remove(fd); - } - } else if (op == Native.IORING_OP_CONNECT) { - channel = handleConnect(fd, res); } else if (op == Native.IORING_OP_TIMEOUT) { if (res == Native.ERRNO_ETIME_NEGATIVE) { prevDeadlineNanos = NONE; } - channel = null; } else { - return true; + // Remaining events should be channel-specific + final AbstractIOUringChannel channel = channels.get(fd); + if (channel == null) { + return; + } + if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT) { + handleRead(channel, res); + } else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_WRITE) { + handleWrite(channel, res); + } else if (op == Native.IORING_OP_POLL_ADD) { + handlePollAdd(channel, res, pollMask); + } else if (op == Native.IORING_OP_POLL_REMOVE) { + if (res == Errors.ERRNO_ENOENT_NEGATIVE) { + logger.trace("IORING_POLL_REMOVE not successful"); + } else if (res == 0) { + logger.trace("IORING_POLL_REMOVE successful"); + } + if (!channel.ioScheduled()) { + // We cancelled the POLL ops which means we are done and should remove the mapping. + channels.remove(fd); + return; + } + } else if (op == Native.IORING_OP_CONNECT) { + handleConnect(channel, res); + } + channel.ioUringUnsafe().processDelayedClose(); } - if (channel != null) { - ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).processDelayedClose(); - } - return true; } - private AbstractIOUringChannel handleRead(int fd, int res) { - AbstractIOUringChannel readChannel = channels.get(fd); - if (readChannel != null) { - ((AbstractIOUringChannel.AbstractUringUnsafe) readChannel.unsafe()).readComplete(res); - } - return readChannel; + private void handleRead(AbstractIOUringChannel channel, int res) { + channel.ioUringUnsafe().readComplete(res); } - private AbstractIOUringChannel handleWrite(int fd, int res) { - AbstractIOUringChannel writeChannel = channels.get(fd); - if (writeChannel != null) { - ((AbstractIOUringChannel.AbstractUringUnsafe) writeChannel.unsafe()).writeComplete(res); - } - return writeChannel; + private void handleWrite(AbstractIOUringChannel channel, int res) { + channel.ioUringUnsafe().writeComplete(res); } - private AbstractIOUringChannel handlePollAdd(int fd, int res, int pollMask) { - AbstractIOUringChannel channel = channels.get(fd); - if (channel != null) { - if ((pollMask & Native.POLLOUT) != 0) { - ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollOut(res); - } - if ((pollMask & Native.POLLIN) != 0) { - ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollIn(res); - } - if ((pollMask & Native.POLLRDHUP) != 0) { - ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollRdHup(res); - } + private void handlePollAdd(AbstractIOUringChannel channel, int res, int pollMask) { + if ((pollMask & Native.POLLOUT) != 0) { + channel.ioUringUnsafe().pollOut(res); + } + if ((pollMask & Native.POLLIN) != 0) { + channel.ioUringUnsafe().pollIn(res); + } + if ((pollMask & Native.POLLRDHUP) != 0) { + channel.ioUringUnsafe().pollRdHup(res); } - return channel; } private void addEventFdRead(IOUringSubmissionQueue submissionQueue) { submissionQueue.addRead(eventfd.intValue(), eventfdReadBuf, 0, 8); } - private AbstractIOUringChannel handleConnect(int fd, int res) { - AbstractIOUringChannel channel = channels.get(fd); - if (channel != null) { - ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).connectComplete(res); - } - return channel; + private void handleConnect(AbstractIOUringChannel channel, int res) { + channel.ioUringUnsafe().connectComplete(res); } @Override diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 53204e7004..ec820d2f4d 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -101,15 +101,18 @@ final class IOUringSubmissionQueue { } private boolean enqueueSqe(int op, int rwFlags, int fd, long bufferAddress, int length, long offset) { - boolean submitted = false; int pending = tail - head; - if (pending == ringEntries) { - submit(); - submitted = true; + boolean submit = pending == ringEntries; + if (submit) { + int submitted = submit(); + if (submitted == 0) { + // We have a problem, could not submit to make more room in the ring + throw new RuntimeException("SQ ring full and no submissions accepted"); + } } long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE; setData(sqe, op, rwFlags, fd, bufferAddress, length, offset); - return submitted; + return submit; } private void setData(long sqe, int op, int rwFlags, int fd, long bufferAddress, int length, long offset) { @@ -185,20 +188,36 @@ final class IOUringSubmissionQueue { return enqueueSqe(Native.IORING_OP_CLOSE, 0, fd, 0, 0, 0); } - public void submit() { + public int submit() { + int submit = tail - head; + return submit > 0 ? submit(submit, 0, 0) : 0; + } + + public int submitAndWait() { int submit = tail - head; if (submit > 0) { - PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier - int ret = Native.ioUringEnter(ringFd, submit, 0, 0); - head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier - if (ret != submit) { - if (ret < 0) { - throw new RuntimeException("ioUringEnter syscall"); - } - logger.warn("Not all submissions succeeded"); - } - submissionCallback.run(); + return submit(submit, 1, Native.IORING_ENTER_GETEVENTS); } + assert submit == 0; + int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS); + if (ret < 0) { + throw new RuntimeException("ioUringEnter syscall returned " + ret); + } + return ret; // should be 0 + } + + private int submit(int toSubmit, int minComplete, int flags) { + PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier + int ret = Native.ioUringEnter(ringFd, toSubmit, minComplete, flags); + head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier + if (ret != toSubmit) { + if (ret < 0) { + throw new RuntimeException("ioUringEnter syscall returned " + ret); + } + logger.warn("Not all submissions succeeded"); + } + submissionCallback.run(); + return ret; } private void setTimeout(long timeoutNanoSeconds) { @@ -231,6 +250,8 @@ final class IOUringSubmissionQueue { PlatformDependent.freeMemory(timeoutMemoryAddress); } + // The getters below are called from JNI code + public int getRingEntries() { return this.ringEntries; } @@ -254,5 +275,4 @@ final class IOUringSubmissionQueue { public long getRingAddress() { return this.ringAddress; } - } diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index 17367fc3fa..f18b8ade40 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -61,10 +61,9 @@ public class NativeTest { assertTrue(completionQueue.ioUringWaitCqe()); assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { + public void handle(int fd, int res, int flags, int op, int mask) { assertEquals(inputString.length(), res); writeEventByteBuf.release(); - return true; } })); @@ -76,10 +75,9 @@ public class NativeTest { assertTrue(completionQueue.ioUringWaitCqe()); assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { + public void handle(int fd, int res, int flags, int op, int mask) { assertEquals(inputString.length(), res); readEventByteBuf.writerIndex(res); - return true; } })); byte[] dataRead = new byte[inputString.length()]; @@ -109,9 +107,8 @@ public class NativeTest { try { completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { + public void handle(int fd, int res, int flags, int op, int mask) { assertEquals(-62, res); - return true; } }); } catch (Exception e) { @@ -158,9 +155,8 @@ public class NativeTest { assertTrue(completionQueue.ioUringWaitCqe()); assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { + public void handle(int fd, int res, int flags, int op, int mask) { assertEquals(1, res); - return true; } })); try { @@ -190,9 +186,8 @@ public class NativeTest { assertTrue(completionQueue.ioUringWaitCqe()); assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { + public void handle(int fd, int res, int flags, int op, int mask) { assertEquals(1, res); - return true; } })); } @@ -242,7 +237,7 @@ public class NativeTest { private final IOUringCompletionQueue.IOUringCompletionQueueCallback verifyCallback = new IOUringCompletionQueue.IOUringCompletionQueueCallback() { @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { + public void handle(int fd, int res, int flags, int op, int mask) { if (op == Native.IORING_OP_POLL_ADD) { assertEquals(Native.ERRNO_ECANCELED_NEGATIVE, res); } else if (op == Native.IORING_OP_POLL_REMOVE) { @@ -250,7 +245,6 @@ public class NativeTest { } else { fail("op " + op); } - return false; } }; @@ -258,9 +252,7 @@ public class NativeTest { public void run() { try { assertTrue(completionQueue.ioUringWaitCqe()); - assertEquals(1, completionQueue.process(verifyCallback)); - assertTrue(completionQueue.ioUringWaitCqe()); - assertEquals(1, completionQueue.process(verifyCallback)); + assertEquals(2, completionQueue.process(verifyCallback)); } catch (AssertionError error) { errorRef.set(error); }