From 49449b300e3ca925b6fb638e7c1d6ba1a53ad634 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 26 Aug 2020 09:30:50 +0200 Subject: [PATCH] Reduce GC by remove creation of objects related to completion queue and submission queue Motivation: We did create a lot of objects related to the completion queue and submission queue which produced a lot of GC. Beside this we also did maintain an extra map which is not really needed as we can encode everything that we need in the user_data field. Modification: - Reduce complexity and GC pressure by store needed informations in the user_data field - Small refactoring of the code to move channel related logic to the channel - Remove unused classes - Use callback to process stuff in the completion queue and so remove all GC created by it - Simplify by not storing channel and buffer in the event Result: Less GC pressure and no extra lookups for events needed --- .../channel/uring/AbstractIOUringChannel.java | 118 +++++-- .../uring/AbstractIOUringServerChannel.java | 52 ++-- .../java/io/netty/channel/uring/Event.java | 60 ---- .../io/netty/channel/uring/EventType.java | 37 --- .../java/io/netty/channel/uring/IOUring.java | 11 +- .../channel/uring/IOUringCompletionQueue.java | 62 ++-- .../io/netty/channel/uring/IOUringCqe.java | 40 --- .../netty/channel/uring/IOUringEventLoop.java | 289 +++++------------- .../channel/uring/IOUringSubmissionQueue.java | 86 ++++-- .../IOUringSocketFixedLengthEchoTest.java | 16 + .../io/netty/channel/uring/NativeTest.java | 77 +++-- 11 files changed, 360 insertions(+), 488 deletions(-) delete mode 100644 transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java delete mode 100644 transport-native-io_uring/src/main/java/io/netty/channel/uring/EventType.java delete mode 100644 transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCqe.java create mode 100644 transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketFixedLengthEchoTest.java diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index e374b7ae71..752fd73d49 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -24,6 +24,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; @@ -133,26 +134,107 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha return loop instanceof IOUringEventLoop; } + private ByteBuf readBuffer; + public void doReadBytes(ByteBuf byteBuf) { + assert readBuffer == null; IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes()); if (byteBuf.hasMemoryAddress()) { - long eventId = ioUringEventLoop.incrementEventIdCounter(); - final Event event = new Event(); - event.setId(eventId); - event.setOp(EventType.READ); - event.setReadBuffer(byteBuf); - event.setAbstractIOUringChannel(this); - submissionQueue.add(eventId, EventType.READ, socket.intValue(), byteBuf.memoryAddress(), + readBuffer = byteBuf; + submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(), byteBuf.writerIndex(), byteBuf.capacity()); - ioUringEventLoop.addNewEvent(event); submissionQueue.submit(); } } + void writeComplete(int res) { + ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); + + if (res > 0) { + channelOutboundBuffer.removeBytes(res); + setWriteable(true); + try { + doWrite(channelOutboundBuffer); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + void readComplete(int localReadAmount) { + boolean close = false; + ByteBuf byteBuf = null; + final IOUringRecvByteAllocatorHandle allocHandle = + (IOUringRecvByteAllocatorHandle) unsafe() + .recvBufAllocHandle(); + final ChannelPipeline pipeline = pipeline(); + try { + logger.trace("EventLoop Read Res: {}", localReadAmount); + logger.trace("EventLoop Fd: {}", fd().intValue()); + setUringInReadyPending(false); + byteBuf = this.readBuffer; + this.readBuffer = null; + + if (localReadAmount > 0) { + byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); + } + + allocHandle.lastBytesRead(localReadAmount); + if (allocHandle.lastBytesRead() <= 0) { + // nothing was read, release the buffer. + byteBuf.release(); + byteBuf = null; + close = allocHandle.lastBytesRead() < 0; + if (close) { + // There is nothing left to read as we received an EOF. + shutdownInput(false); + } + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + return; + } + + allocHandle.incMessagesRead(1); + pipeline.fireChannelRead(byteBuf); + byteBuf = null; + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + + logger.trace("READ autoRead {}", config().isAutoRead()); + if (config().isAutoRead()) { + executeReadEvent(); + } + } catch (Throwable t) { + handleReadException(pipeline, byteBuf, t, close, allocHandle); + } + } + + + private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, + Throwable cause, boolean close, + IOUringRecvByteAllocatorHandle allocHandle) { + if (byteBuf != null) { + if (byteBuf.isReadable()) { + pipeline.fireChannelRead(byteBuf); + } else { + byteBuf.release(); + } + } + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(cause); + if (close || cause instanceof IOException) { + shutdownInput(false); + } else { + if (config().isAutoRead()) { + executeReadEvent(); + } + } + } + protected final ByteBuf newDirectBuffer(ByteBuf buf) { return newDirectBuffer(buf, buf); } @@ -236,6 +318,10 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } finally { socket.close(); + if (readBuffer != null) { + readBuffer.release(); + readBuffer = null; + } } } @@ -273,14 +359,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); - final Event event = new Event(); - long eventId = ioUringEventLoop.incrementEventIdCounter(); - event.setId(eventId); - event.setOp(EventType.WRITE); - event.setAbstractIOUringChannel(this); - submissionQueue.add(eventId, EventType.WRITE, socket.intValue(), buf.memoryAddress(), buf.readerIndex(), + submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), buf.writerIndex()); - ioUringEventLoop.addNewEvent(event); submissionQueue.submit(); writeable = false; } @@ -290,13 +370,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha private void addPollOut() { IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); - final Event event = new Event(); - long eventId = ioUringEventLoop.incrementEventIdCounter(); - event.setId(eventId); - event.setOp(EventType.POLL_OUT); - event.setAbstractIOUringChannel(this); - submissionQueue.addPoll(eventId, socket.intValue(), EventType.POLL_OUT); - ioUringEventLoop.addNewEvent(event); + submissionQueue.addPollOut(socket.intValue()); submissionQueue.submit(); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 3bf36f7b37..2161cc27c4 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -15,11 +15,7 @@ */ package io.netty.channel.uring; -import io.netty.channel.Channel; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.ChannelPromise; -import io.netty.channel.ServerChannel; -import io.netty.channel.unix.Socket; +import io.netty.channel.*; import java.net.SocketAddress; @@ -49,6 +45,27 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple abstract Channel newChildChannel(int fd) throws Exception; + void acceptComplete(int res) { + if (res >= 0) { + final IOUringRecvByteAllocatorHandle allocHandle = + (IOUringRecvByteAllocatorHandle) unsafe() + .recvBufAllocHandle(); + final ChannelPipeline pipeline = pipeline(); + + allocHandle.incMessagesRead(1); + try { + final Channel childChannel = newChildChannel(res); + pipeline.fireChannelRead(childChannel); + } catch (Exception e) { + e.printStackTrace(); + } + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + } + //Todo refactoring method name + executeReadEvent(); + } + final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { private final byte[] acceptedAddress = new byte[26]; @@ -58,36 +75,15 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple promise.setFailure(new UnsupportedOperationException()); } - private void addPoll(IOUringEventLoop ioUringEventLoop) { - - long eventId = ioUringEventLoop.incrementEventIdCounter(); - Event event = new Event(); - event.setOp(EventType.POLL_LINK); - - event.setId(eventId); - event.setAbstractIOUringChannel(AbstractIOUringServerChannel.this); - ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue() - .addPoll(eventId, socket.intValue(), event.getOp()); - ((IOUringEventLoop) eventLoop()).addNewEvent(event); - } @Override public void uringEventExecution() { final IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); - - addPoll(ioUringEventLoop); - - long eventId = ioUringEventLoop.incrementEventIdCounter(); - final Event event = new Event(); - event.setId(eventId); - event.setOp(EventType.ACCEPT); - event.setAbstractIOUringChannel(getChannel()); + submissionQueue.addPollLink(socket.intValue()); //Todo get network addresses - submissionQueue.add(eventId, EventType.ACCEPT, getChannel().getSocket().intValue(), 0, 0, 0); - ioUringEventLoop.addNewEvent(event); - + submissionQueue.addAccept(fd().intValue()); submissionQueue.submit(); } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java deleted file mode 100644 index 6ba2c6e210..0000000000 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2020 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.uring; - -import io.netty.buffer.ByteBuf; - -final class Event { - private long id; - - private ByteBuf readBuffer; - - //Todo use fd instead - private AbstractIOUringChannel abstractIOUringChannel; - private EventType op; - - public AbstractIOUringChannel getAbstractIOUringChannel() { - return abstractIOUringChannel; - } - - public void setAbstractIOUringChannel(AbstractIOUringChannel abstractIOUringChannel) { - this.abstractIOUringChannel = abstractIOUringChannel; - } - - public ByteBuf getReadBuffer() { - return readBuffer; - } - - public void setReadBuffer(ByteBuf readBuffer) { - this.readBuffer = readBuffer; - } - - public long getId() { - return id; - } - - public void setId(final long id) { - this.id = id; - } - - public EventType getOp() { - return op; - } - - public void setOp(final EventType op) { - this.op = op; - } -} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/EventType.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/EventType.java deleted file mode 100644 index ea0af391cc..0000000000 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/EventType.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2020 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.uring; - -enum EventType { - ACCEPT(13), - READ(22), - WRITE(23), - TIMEOUT(11), - POLL_EVENTFD(6), - POLL_LINK(6), - POLL_RDHUP(6), - POLL_OUT(6); - - private final int op; - - EventType(int op) { - this.op = op; - } - - public int getOp() { - return op; - } -} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java index 187101f54e..e90b6dc47a 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java @@ -20,7 +20,16 @@ import io.netty.util.internal.SystemPropertyUtil; final class IOUring { - private static final Throwable UNAVAILABILITY_CAUSE; + private static final Throwable UNAVAILABILITY_CAUSE; + static final int IO_POLL = 6; + static final int IO_TIMEOUT = 11; + static final int OP_ACCEPT = 13; + static final int OP_READ = 22; + static final int OP_WRITE = 23; + + static final int POLLMASK_LINK = 1; + static final int POLLMASK_OUT = 4; + static final int POLLMASK_RDHUP = 8192; static { Throwable cause = null; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java index 4d512280d8..3fa53e1d34 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java @@ -55,46 +55,54 @@ final class IOUringCompletionQueue { this.ringFd = ringFd; } - public IOUringCqe poll() { - long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); + public int process(IOUringCompletionQueueCallback callback) { + int i = 0; + for (;;) { + long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); + if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) { + long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress)); + long cqe = index * CQE_SIZE + completionQueueArrayAddress; - if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) { - long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress)); - long cqe = index * CQE_SIZE + completionQueueArrayAddress; + long udata = PlatformDependent.getLong(cqe + CQE_USER_DATA_FIELD); + int res = PlatformDependent.getInt(cqe + CQE_RES_FIELD); + long flags = toUnsignedLong(PlatformDependent.getInt(cqe + CQE_FLAGS_FIELD)); - long eventId = PlatformDependent.getLong(cqe + CQE_USER_DATA_FIELD); - int res = PlatformDependent.getInt(cqe + CQE_RES_FIELD); - long flags = toUnsignedLong(PlatformDependent.getInt(cqe + CQE_FLAGS_FIELD)); + //Ensure that the kernel only sees the new value of the head index after the CQEs have been read. + PlatformDependent.putIntOrdered(kHeadAddress, (int) (head + 1)); - //Ensure that the kernel only sees the new value of the head index after the CQEs have been read. - PlatformDependent.putIntOrdered(kHeadAddress, (int) (head + 1)); - - return new IOUringCqe(eventId, res, flags); - } - return null; + int fd = (int) (udata >> 32); + int opMask = (int) udata; + short op = (short) (opMask >> 16); + short mask = (short) opMask; + i++; + if (!callback.handle(fd, res, flags, op, mask)) { + break; + } + } else { + if (i == 0) { + return -1; + } + return i; + } + } + return i; } - public IOUringCqe ioUringWaitCqe() { - IOUringCqe ioUringCqe = poll(); - - if (ioUringCqe != null) { - return ioUringCqe; - } + interface IOUringCompletionQueueCallback { + boolean handle(int fd, int res, long flags, int op, int mask); + } + public boolean ioUringWaitCqe() { //IORING_ENTER_GETEVENTS -> wait until an event is completely processed int ret = Native.ioUringEnter(ringFd, 0, 1, IORING_ENTER_GETEVENTS); if (ret < 0) { //Todo throw exception! - return null; + return false; } else if (ret == 0) { - ioUringCqe = poll(); - - if (ioUringCqe != null) { - return ioUringCqe; - } + return true; } //Todo throw Exception! - return null; + return false; } public long getKHeadAddress() { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCqe.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCqe.java deleted file mode 100644 index 7e84d956f4..0000000000 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCqe.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2020 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.uring; - -final class IOUringCqe { - private final long eventId; - private final int res; - private final long flags; - - IOUringCqe(long eventId, int res, long flags) { - this.eventId = eventId; - this.res = res; - this.flags = flags; - } - - public long getEventId() { - return this.eventId; - } - - public int getRes() { - return this.res; - } - - public long getFlags() { - return this.flags; - } -} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 4187a092d8..8244308824 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -15,16 +15,11 @@ */ package io.netty.channel.uring; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.unix.FileDescriptor; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; -import io.netty.util.collection.LongObjectHashMap; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -36,7 +31,8 @@ import java.util.concurrent.atomic.AtomicLong; import static io.netty.channel.unix.Errors.*; -final class IOUringEventLoop extends SingleThreadEventLoop { +final class IOUringEventLoop extends SingleThreadEventLoop implements + IOUringCompletionQueue.IOUringCompletionQueueCallback { private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class); //Todo set config ring buffer size @@ -48,7 +44,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop { // events should be unique to identify which event type that was private long eventIdCounter; - private final LongObjectHashMap events = new LongObjectHashMap(); private final IntObjectMap channels = new IntObjectHashMap(4096); private final RingBuffer ringBuffer; @@ -84,13 +79,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop { : PlatformDependent.newMpscQueue(maxPendingTasks); } - public long incrementEventIdCounter() { - long eventId = eventIdCounter; - logger.trace("incrementEventIdCounter EventId: {}", eventId); - eventIdCounter++; - return eventId; - } - public void add(AbstractIOUringChannel ch) { logger.trace("Add Channel: {} ", ch.socket.intValue()); int fd = ch.socket.intValue(); @@ -123,22 +111,13 @@ final class IOUringEventLoop extends SingleThreadEventLoop { } } - public void addNewEvent(Event event) { - events.put(event.getId(), event); - } - @Override protected void run() { final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue(); final IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); // Lets add the eventfd related events before starting to do any real work. - long eventId = incrementEventIdCounter(); - Event event = new Event(); - event.setOp(EventType.POLL_EVENTFD); - event.setId(eventId); - addNewEvent(event); - submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp()); + submissionQueue.addPollLink(eventfd.intValue()); submissionQueue.submit(); for (;;) { @@ -148,7 +127,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); - IOUringCqe ioUringCqe; // Only submit a timeout if there are no tasks to process and do a blocking operation // on the completionQueue. @@ -156,43 +134,24 @@ final class IOUringEventLoop extends SingleThreadEventLoop { try { if (curDeadlineNanos != prevDeadlineNanos) { prevDeadlineNanos = curDeadlineNanos; - event = new Event(); - eventId = incrementEventIdCounter(); - event.setId(eventId); - event.setOp(EventType.TIMEOUT); - addNewEvent(event); - submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), eventId); + submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos)); submissionQueue.submit(); } - // Block if there is nothing to process. - logger.trace("ioUringWaitCqe {}", this.toString()); - ioUringCqe = completionQueue.ioUringWaitCqe(); + // Check there were any completion events to process + if (completionQueue.process(this) == -1) { + // Block if there is nothing to process after this try again to call process(....) + logger.trace("ioUringWaitCqe {}", this.toString()); + completionQueue.ioUringWaitCqe(); + } } finally { - if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) { pendingWakeup = true; } } - } else { - // Just poll as there are tasks to process so we don't want to block. - logger.trace("poll {}", this.toString()); - ioUringCqe = completionQueue.poll(); } - while (ioUringCqe != null) { - event = events.get(ioUringCqe.getEventId()); - - if (event != null) { - logger.trace("EventType Incoming: " + event.getOp().name()); - processEvent(ioUringCqe.getRes(), event); - } - - // Process one entry after the other until there are none left. This will ensure we process - // all of these before we try to consume tasks. - ioUringCqe = completionQueue.poll(); - logger.trace("poll {}", this.toString()); - } + completionQueue.process(this); if (hasTasks()) { runAllTasks(); @@ -211,145 +170,77 @@ final class IOUringEventLoop extends SingleThreadEventLoop { } } - private void processEvent(final int res, final Event event) { - // Remove the id first so we not end up with invalid entries in any cases. - this.events.remove(event.getId()); - + @Override + public boolean handle(int fd, int res, long flags, int op, int pollMask) { IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); - switch (event.getOp()) { - case ACCEPT: - logger.trace("EventLoop Accept filedescriptor: {}", res); - event.getAbstractIOUringChannel().setUringInReadyPending(false); - if (res != -1 && res != ERRNO_EAGAIN_NEGATIVE && - res != ERRNO_EWOULDBLOCK_NEGATIVE) { - AbstractIOUringServerChannel abstractIOUringServerChannel = - (AbstractIOUringServerChannel) event.getAbstractIOUringChannel(); - logger.trace("server filedescriptor Fd: {}", abstractIOUringServerChannel.getSocket().intValue()); - final IOUringRecvByteAllocatorHandle allocHandle = - (IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe() - .recvBufAllocHandle(); - final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline(); - - allocHandle.lastBytesRead(res); - if (allocHandle.lastBytesRead() > 0) { - allocHandle.incMessagesRead(1); - try { - final Channel childChannel = - abstractIOUringServerChannel.newChildChannel(allocHandle.lastBytesRead()); - pipeline.fireChannelRead(childChannel); - pollRdHup((AbstractIOUringChannel) childChannel); - } catch (Exception e) { - e.printStackTrace(); - } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - } - } - - //Todo refactoring method name - event.getAbstractIOUringChannel().executeReadEvent(); - break; - case READ: - boolean close = false; - ByteBuf byteBuf = null; - int localReadAmount = res; - final IOUringRecvByteAllocatorHandle allocHandle = - (IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe() - .recvBufAllocHandle(); - final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline(); - try { - logger.trace("EventLoop Read Res: {}", res); - logger.trace("EventLoop Fd: {}", event.getAbstractIOUringChannel().getSocket().intValue()); - event.getAbstractIOUringChannel().setUringInReadyPending(false); - byteBuf = event.getReadBuffer(); - if (localReadAmount > 0) { - byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); - } - - allocHandle.lastBytesRead(localReadAmount); - if (allocHandle.lastBytesRead() <= 0) { - // nothing was read, release the buffer. - byteBuf.release(); - byteBuf = null; - close = allocHandle.lastBytesRead() < 0; - if (close) { - // There is nothing left to read as we received an EOF. - event.getAbstractIOUringChannel().shutdownInput(false); - } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); + switch (op) { + case IOUring.OP_ACCEPT: + AbstractIOUringServerChannel acceptChannel = (AbstractIOUringServerChannel) channels.get(fd); + if (acceptChannel == null) { break; } - - allocHandle.incMessagesRead(1); - pipeline.fireChannelRead(byteBuf); - byteBuf = null; - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - - logger.trace("READ autoRead {}", event.getAbstractIOUringChannel().config().isAutoRead()); - if (event.getAbstractIOUringChannel().config().isAutoRead()) { - event.getAbstractIOUringChannel().executeReadEvent(); + logger.trace("EventLoop Accept filedescriptor: {}", res); + acceptChannel.setUringInReadyPending(false); + if (res != -1 && res != ERRNO_EAGAIN_NEGATIVE && + res != ERRNO_EWOULDBLOCK_NEGATIVE) { + logger.trace("server filedescriptor Fd: {}", fd); + acceptChannel.acceptComplete(res); + pollRdHup(res); } - } catch (Throwable t) { - handleReadException(event.getAbstractIOUringChannel(), pipeline, byteBuf, t, close, allocHandle); - } - break; - case WRITE: - //localFlushAmount -> res - logger.trace("EventLoop Write Res: {}", res); - logger.trace("EventLoop Fd: {}", event.getAbstractIOUringChannel().getSocket().intValue()); - ChannelOutboundBuffer channelOutboundBuffer = event - .getAbstractIOUringChannel().unsafe().outboundBuffer(); - AbstractIOUringChannel channel = event.getAbstractIOUringChannel(); - - if (res == SOCKET_ERROR_EPIPE) { - event.getAbstractIOUringChannel().shutdownInput(false); break; - } - - if (res > 0) { - channelOutboundBuffer.removeBytes(res); - channel.setWriteable(true); - try { - event.getAbstractIOUringChannel().doWrite(channelOutboundBuffer); - } catch (Exception e) { - e.printStackTrace(); + case IOUring.OP_READ: + AbstractIOUringChannel readChannel = channels.get(fd); + if (readChannel == null) { + break; } - } - break; - case TIMEOUT: - if (res == ETIME) { - prevDeadlineNanos = NONE; - } + readChannel.readComplete(res); + break; + case IOUring.OP_WRITE: + AbstractIOUringChannel writeChannel = channels.get(fd); + if (writeChannel == null) { + break; + } + //localFlushAmount -> res + logger.trace("EventLoop Write Res: {}", res); + logger.trace("EventLoop Fd: {}", fd); - break; - case POLL_EVENTFD: - pendingWakeup = false; - // We need to consume the data as otherwise we would see another event in the completionQueue without - // an extra eventfd_write(....) - Native.eventFdRead(eventfd.intValue()); - Event eventfdEvent = new Event(); - eventfdEvent.setId(incrementEventIdCounter()); - eventfdEvent.setOp(EventType.POLL_EVENTFD); - addNewEvent(eventfdEvent); - submissionQueue.addPoll(eventfdEvent.getId(), eventfd.intValue(), eventfdEvent.getOp()); - // Submit so its picked up - submissionQueue.submit(); - break; - case POLL_LINK: - //Todo error handling error - logger.trace("POLL_LINK Res: {}", res); - break; - case POLL_RDHUP: - if (!event.getAbstractIOUringChannel().isActive()) { - event.getAbstractIOUringChannel().shutdownInput(true); - } - break; - case POLL_OUT: - logger.trace("POLL_OUT Res: {}", res); - break; + if (res == SOCKET_ERROR_EPIPE) { + writeChannel.shutdownInput(false); + } else { + writeChannel.writeComplete(res); + } + break; + case IOUring.IO_TIMEOUT: + if (res == ETIME) { + prevDeadlineNanos = NONE; + } + + break; + case IOUring.IO_POLL: + if (eventfd.intValue() == fd) { + pendingWakeup = false; + // We need to consume the data as otherwise we would see another event in the completionQueue without + // an extra eventfd_write(....) + Native.eventFdRead(eventfd.intValue()); + submissionQueue.addPollLink(eventfd.intValue()); + // Submit so its picked up + submissionQueue.submit(); + } else { + if (pollMask == IOUring.POLLMASK_RDHUP) { + AbstractIOUringChannel channel = channels.get(fd); + if (channel != null && !channel.isActive()) { + channel.shutdownInput(true); + } + } else { + //Todo error handling error + logger.trace("POLL_LINK Res: {}", res); + break; + } + } + + break; } + return true; } @Override @@ -374,38 +265,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop { } } - private void handleReadException(AbstractIOUringChannel channel, ChannelPipeline pipeline, ByteBuf byteBuf, - Throwable cause, boolean close, - IOUringRecvByteAllocatorHandle allocHandle) { - if (byteBuf != null) { - if (byteBuf.isReadable()) { - pipeline.fireChannelRead(byteBuf); - } else { - byteBuf.release(); - } - } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(cause); - if (close || cause instanceof IOException) { - channel.shutdownInput(false); - } else { - if (channel.config().isAutoRead()) { - channel.executeReadEvent(); - } - } - } - //to be notified when the filedesciptor is closed - private void pollRdHup(AbstractIOUringChannel channel) { + private void pollRdHup(int fd) { //all childChannels should poll POLLRDHUP - long eventId = incrementEventIdCounter(); - Event event = new Event(); - event.setOp(EventType.POLL_RDHUP); - event.setId(eventId); - event.setAbstractIOUringChannel(channel); - addNewEvent(event); - ringBuffer.getIoUringSubmissionQueue().addPoll(eventId, channel.socket.intValue(), event.getOp()); + ringBuffer.getIoUringSubmissionQueue().addPollRdHup(fd); ringBuffer.getIoUringSubmissionQueue().submit(); } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 09c36fcc3a..e91a9f2673 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -28,9 +28,6 @@ final class IOUringSubmissionQueue { private static final int SQE_SIZE = 64; private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support? private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec - private static final int POLLIN = 1; - private static final int POLLRDHUP = 8192; - private static final int POLLOUT = 4; private static final int IOSQE_IO_LINK = 4; @@ -109,26 +106,31 @@ final class IOUringSubmissionQueue { return sqe; } - private void setData(long sqe, long eventId, EventType type, int fd, long bufferAddress, int length, long offset) { + private void setData(long sqe, byte op, int pollMask, int fd, long bufferAddress, int length, long offset) { //Todo cleaner //set sqe(submission queue) properties - PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) type.getOp()); + PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, op); PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0); PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd); PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset); PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress); PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length); - PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId); + + // Store the fd and event type in the user_data field + int opMask = (((short) op) << 16) | (((short) pollMask) & 0xFFFF); + long uData = (long)fd << 32 | opMask & 0xFFFFFFFFL; + + PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData); //pollread or accept operation - if (type == EventType.POLL_LINK || type == EventType.POLL_OUT) { + if (op == 6 && (pollMask == IOUring.POLLMASK_OUT || pollMask == IOUring.POLLMASK_LINK)) { PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) IOSQE_IO_LINK); } else { PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); } //c union set Rw-Flags or accept_flags - if (type != EventType.ACCEPT) { + if (op != IOUring.OP_ACCEPT) { PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); } else { //accept_flags set NON_BLOCKING @@ -142,56 +144,74 @@ final class IOUringSubmissionQueue { offsetIndex += 8; } - logger.trace("OPField: {}", type.name()); + if (pollMask != 0) { + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask); + } + logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD)); logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD)); logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD)); logger.trace("Offset: {}", PlatformDependent.getLong(sqe + SQE_OFFSET_FIELD)); } - public boolean addTimeout(long nanoSeconds, long eventId) { + public boolean addTimeout(long nanoSeconds) { long sqe = getSqe(); if (sqe == 0) { return false; } setTimeout(nanoSeconds); - setData(sqe, eventId, EventType.TIMEOUT, -1, timeoutMemoryAddress, 1, 0); + setData(sqe, (byte) IOUring.IO_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0); return true; } - public boolean addPoll(long eventId, int fd, EventType eventType) { + public boolean addPollLink(int fd) { + return addPoll(fd, IOUring.POLLMASK_LINK); + } + + + public boolean addPollOut(int fd) { + return addPoll(fd, IOUring.POLLMASK_OUT); + } + + + public boolean addPollRdHup(int fd) { + return addPoll(fd, IOUring.POLLMASK_RDHUP); + } + + private boolean addPoll(int fd, int pollMask) { long sqe = getSqe(); if (sqe == 0) { return false; } - int pollMask; - switch (eventType) { - case POLL_EVENTFD: - case POLL_LINK: - pollMask = POLLIN; - break; - case POLL_OUT: - pollMask = POLLOUT; - break; - case POLL_RDHUP: - pollMask = POLLRDHUP; - break; - default: - //Todo exeception - return false; - } - setData(sqe, eventId, eventType, fd, 0, 0, 0); - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask); + + setData(sqe, (byte) IOUring.IO_POLL, pollMask, fd, 0, 0, 0); return true; } - //Todo ring buffer errors for example if submission queue is full - public boolean add(long eventId, EventType type, int fd, long bufferAddress, int pos, int limit) { + public boolean addRead(int fd, long bufferAddress, int pos, int limit) { long sqe = getSqe(); if (sqe == 0) { return false; } - setData(sqe, eventId, type, fd, bufferAddress + pos, limit - pos, 0); + setData(sqe, (byte) IOUring.OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0); + return true; + } + + public boolean addWrite(int fd, long bufferAddress, int pos, int limit) { + long sqe = getSqe(); + if (sqe == 0) { + return false; + } + setData(sqe, (byte) IOUring.OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0); + return true; + } + + public boolean addAccept(int fd) { + long sqe = getSqe(); + if (sqe == 0) { + return false; + } + setData(sqe, (byte) IOUring.OP_ACCEPT, 0, fd, 0,0,0); return true; } diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketFixedLengthEchoTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketFixedLengthEchoTest.java new file mode 100644 index 0000000000..302bfb51e0 --- /dev/null +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketFixedLengthEchoTest.java @@ -0,0 +1,16 @@ +package io.netty.channel.uring; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketFixedLengthEchoTest; + +import java.util.List; + +public class IOUringSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest { + + @Override + protected List> newFactories() { + return IOUringSocketTestPermutation.INSTANCE.socket(); + } +} diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index c6e3bc35bc..f0d41ac09c 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -31,8 +31,8 @@ public class NativeTest { final long eventId = 1; ByteBufAllocator allocator = new UnpooledByteBufAllocator(true); - ByteBuf writeEventByteBuf = allocator.directBuffer(100); - String inputString = "Hello World!"; + final ByteBuf writeEventByteBuf = allocator.directBuffer(100); + final String inputString = "Hello World!"; writeEventByteBuf.writeCharSequence(inputString, Charset.forName("UTF-8")); int fd = Native.createFile(); @@ -45,26 +45,34 @@ public class NativeTest { assertNotNull(submissionQueue); assertNotNull(completionQueue); - assertTrue(submissionQueue.add(eventId, EventType.WRITE, fd, writeEventByteBuf.memoryAddress(), + assertTrue(submissionQueue.addWrite(fd, writeEventByteBuf.memoryAddress(), writeEventByteBuf.readerIndex(), writeEventByteBuf.writerIndex())); submissionQueue.submit(); - IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe(); - assertNotNull(ioUringCqe); - assertEquals(inputString.length(), ioUringCqe.getRes()); - assertEquals(1, ioUringCqe.getEventId()); - writeEventByteBuf.release(); + assertTrue(completionQueue.ioUringWaitCqe()); + assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { + @Override + public boolean handle(int fd, int res, long flags, int op, int mask) { + assertEquals(inputString.length(), res); + writeEventByteBuf.release(); + return true; + } + })); - ByteBuf readEventByteBuf = allocator.directBuffer(100); - assertTrue(submissionQueue.add(eventId + 1, EventType.READ, fd, readEventByteBuf.memoryAddress(), + final ByteBuf readEventByteBuf = allocator.directBuffer(100); + assertTrue(submissionQueue.addRead(fd, readEventByteBuf.memoryAddress(), readEventByteBuf.writerIndex(), readEventByteBuf.capacity())); submissionQueue.submit(); - ioUringCqe = completionQueue.ioUringWaitCqe(); - assertEquals(2, ioUringCqe.getEventId()); - assertEquals(inputString.length(), ioUringCqe.getRes()); - - readEventByteBuf.writerIndex(ioUringCqe.getRes()); + assertTrue(completionQueue.ioUringWaitCqe()); + assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { + @Override + public boolean handle(int fd, int res, long flags, int op, int mask) { + assertEquals(inputString.length(), res); + readEventByteBuf.writerIndex(res); + return true; + } + })); byte[] dataRead = new byte[inputString.length()]; readEventByteBuf.readBytes(dataRead); @@ -86,9 +94,14 @@ public class NativeTest { Thread thread = new Thread() { @Override public void run() { - final IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe(); - assertEquals(-62, ioUringCqe.getRes()); - assertEquals(1, ioUringCqe.getEventId()); + assertTrue(completionQueue.ioUringWaitCqe()); + completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { + @Override + public boolean handle(int fd, int res, long flags, int op, int mask) { + assertEquals(-62, res); + return true; + } + }); } }; thread.start(); @@ -98,7 +111,7 @@ public class NativeTest { e.printStackTrace(); } - submissionQueue.addTimeout(0, 1); + submissionQueue.addTimeout(0); submissionQueue.submit(); } @@ -114,7 +127,7 @@ public class NativeTest { assertNotNull(completionQueue); final FileDescriptor eventFd = Native.newEventFd(); - assertTrue(submissionQueue.addPoll(1, eventFd.intValue(), EventType.POLL_EVENTFD)); + assertTrue(submissionQueue.addPollLink(eventFd.intValue())); submissionQueue.submit(); new Thread() { @@ -124,9 +137,14 @@ public class NativeTest { } }.start(); - IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe(); - assertEquals(1, ioUringCqe.getRes()); - assertEquals(1, ioUringCqe.getEventId()); + assertTrue(completionQueue.ioUringWaitCqe()); + assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { + @Override + public boolean handle(int fd, int res, long flags, int op, int mask) { + assertEquals(1, res); + return true; + } + })); } //Todo clean @@ -146,14 +164,19 @@ public class NativeTest { Thread waitingCqe = new Thread() { @Override public void run() { - IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe(); - assertEquals(1, ioUringCqe.getRes()); - assertEquals(1, ioUringCqe.getEventId()); + assertTrue(completionQueue.ioUringWaitCqe()); + assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { + @Override + public boolean handle(int fd, int res, long flags, int op, int mask) { + assertEquals(1, res); + return true; + } + })); } }; waitingCqe.start(); final FileDescriptor eventFd = Native.newEventFd(); - assertTrue(submissionQueue.addPoll(1, eventFd.intValue(), EventType.POLL_EVENTFD)); + assertTrue(submissionQueue.addPollLink(eventFd.intValue())); submissionQueue.submit(); new Thread() {