diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index 23036a1d30..ec132538bd 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -511,7 +511,7 @@ public final class PlatformDependent { return PlatformDependent0.getInt(address); } - public static int getIntVolatalile(long address) { + public static int getIntVolatile(long address) { return PlatformDependent0.getIntVolatile(address); } diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java index 43c4030c12..1d3f69a666 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java @@ -605,18 +605,6 @@ final class PlatformDependent0 { UNSAFE.putObject(o, offset, x); } - static void loadFence() { - UNSAFE.loadFence(); - } - - static void storeFence() { - UNSAFE.storeFence(); - } - - static void fullFence() { - UNSAFE.fullFence(); - } - static void copyMemory(long srcAddr, long dstAddr, long length) { // Manual safe-point polling is only needed prior Java9: // See https://bugs.openjdk.java.net/browse/JDK-8149596 diff --git a/transport-native-io_uring/src/main/c/io_uring.h b/transport-native-io_uring/src/main/c/io_uring.h index a5e7020af4..e53d6d3067 100644 --- a/transport-native-io_uring/src/main/c/io_uring.h +++ b/transport-native-io_uring/src/main/c/io_uring.h @@ -64,4 +64,4 @@ struct io_uring { int ring_fd; }; -#endif \ No newline at end of file +#endif diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_native.c b/transport-native-io_uring/src/main/c/netty_io_uring_native.c index 5e1757cfcc..79da5ede2c 100644 --- a/transport-native-io_uring/src/main/c/netty_io_uring_native.c +++ b/transport-native-io_uring/src/main/c/netty_io_uring_native.c @@ -294,4 +294,4 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) { //unload return NETTY_JNI_VERSION; -} \ No newline at end of file +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 8edc0204c6..b409f566d2 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -32,6 +32,7 @@ import io.netty.channel.unix.UnixChannel; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.ReferenceCountUtil; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.UnresolvedAddressException; @@ -164,6 +165,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha unsafe.executeUringReadOperator(); } + //Channel/ChannelHandlerContext.write @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { //Todo write until there is nothing left in the buffer @@ -240,6 +242,16 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha throw new UnsupportedOperationException("unsupported message type"); } + @Override + protected void doRegister() throws Exception { + ((IOUringEventLoop) eventLoop()).add(this); + } + + @Override + protected void doDeregister() throws Exception { + ((IOUringEventLoop) eventLoop()).remove(this); + } + @Override public void doBind(final SocketAddress local) throws Exception { if (local instanceof InetSocketAddress) { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java index eba1f9fe5c..6ba2c6e210 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java @@ -21,6 +21,8 @@ final class Event { private long id; private ByteBuf readBuffer; + + //Todo use fd instead private AbstractIOUringChannel abstractIOUringChannel; private EventType op; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java index aeafd1cb43..572a4dc467 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java @@ -57,7 +57,7 @@ final class IOUringCompletionQueue { public IOUringCqe peek() { long cqe = 0; - long head = toUnsignedLong(PlatformDependent.getIntVolatalile(kHeadAddress)); + long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) { long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress)); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 36914c556c..17ff533799 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -19,8 +19,11 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SingleThreadEventLoop; +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.LongObjectHashMap; +import java.util.Set; import java.util.concurrent.Executor; import static io.netty.channel.unix.Errors.*; @@ -30,6 +33,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop { // events should be unique to identify which event type that was private long eventIdCounter; private final LongObjectHashMap events = new LongObjectHashMap(); + private final IntObjectMap channels = new IntObjectHashMap(4096); private RingBuffer ringBuffer; IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { @@ -44,6 +48,38 @@ final class IOUringEventLoop extends SingleThreadEventLoop { return eventId; } + public void add(AbstractIOUringChannel ch) { + System.out.println("Add Channel: " + ch.socket.intValue()); + int fd = ch.socket.intValue(); + + channels.put(fd, ch); + } + + public void remove(AbstractIOUringChannel ch) { + System.out.println("Remove Channel: " + ch.socket.intValue()); + int fd = ch.socket.intValue(); + + AbstractIOUringChannel old = channels.remove(fd); + if (old != null && old != ch) { + // The Channel mapping was already replaced due FD reuse, put back the stored Channel. + channels.put(fd, old); + + // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed. + assert !ch.isOpen(); + } + } + + private void closeAll() { + System.out.println("CloseAll IOUringEvenloop"); + // Using the intermediate collection to prevent ConcurrentModificationException. + // In the `close()` method, the channel is deleted from `channels` map. + AbstractIOUringChannel[] localChannels = channels.values().toArray(new AbstractIOUringChannel[0]); + + for (AbstractIOUringChannel ch : localChannels) { + ch.unsafe().close(ch.unsafe().voidPromise()); + } + } + public void addNewEvent(Event event) { events.put(event.getId(), event); } @@ -73,7 +109,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop { final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline(); allocHandle.lastBytesRead(ioUringCqe.getRes()); - if (allocHandle.lastBytesRead() != -1) { + if (allocHandle.lastBytesRead() > 0) { allocHandle.incMessagesRead(1); try { pipeline.fireChannelRead(abstractIOUringServerChannel @@ -96,8 +132,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop { ringBuffer.getIoUringSubmissionQueue().submit(); break; case READ: - System.out.println("Eventlloop Read Res: " + ioUringCqe.getRes()); - System.out.println("Eventloop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd()); + System.out.println("EventLoop Read Res: " + ioUringCqe.getRes()); + System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd()); ByteBuf byteBuf = event.getReadBuffer(); int localReadAmount = ioUringCqe.getRes(); if (localReadAmount > 0) { @@ -126,8 +162,9 @@ final class IOUringEventLoop extends SingleThreadEventLoop { event.getAbstractIOUringChannel().executeReadEvent(); break; case WRITE: - System.out.println("Eventloop Write Res: " + ioUringCqe.getRes()); - System.out.println("Eventloop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd()); + System.out.println("EventLoop Write Res: " + ioUringCqe.getRes()); + System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd()); + System.out.println("EventLoop Pipeline: " + event.getAbstractIOUringChannel().eventLoop()); //remove bytes int localFlushAmount = ioUringCqe.getRes(); if (localFlushAmount > 0) { @@ -135,14 +172,23 @@ final class IOUringEventLoop extends SingleThreadEventLoop { } break; } - } else { - System.out.println("Event is null!!!! "); } } //run tasks if (hasTasks()) { runAllTasks(); } + + try { + if (isShuttingDown()) { + closeAll(); + if (confirmShutdown()) { + break; + } + } + } catch (Throwable t) { + System.out.println("Exception error " + t); + } try { Thread.sleep(10); } catch (InterruptedException e) { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java index 544ce46bfb..65607583a8 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java @@ -27,7 +27,7 @@ public final class IOUringServerSocketChannel extends AbstractIOUringServerChann private final IOUringServerSocketChannelConfig config; public IOUringServerSocketChannel() { - super(Socket.newSocketStreamBlocking().getFd()); + super(Socket.newSocketStream().intValue()); this.config = new IOUringServerSocketChannelConfig(this); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 6962fa9e95..674cd6c852 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -53,6 +53,9 @@ final class IOUringSubmissionQueue { private final long ringAddress; private final int ringFd; + private static final int SOCK_NONBLOCK = 2048; + private static final int SOCK_CLOEXEC = 524288; + IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress, long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress, int ringSize, @@ -92,9 +95,16 @@ final class IOUringSubmissionQueue { PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset); PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress); PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length); - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId); + //c union set Rw-Flags or accept_flags + if (type != EventType.ACCEPT) { + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); + } else { + //accept_flags set NON_BLOCKING + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC); + } + // pad field array -> all fields should be zero long offsetIndex = 0; for (int i = 0; i < 3; i++) { @@ -124,7 +134,7 @@ final class IOUringSubmissionQueue { private int flushSqe() { long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress)); - long kHead = toUnsignedLong(PlatformDependent.getIntVolatalile(kHeadAddress)); + long kHead = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress)); System.out.println("Ktail: " + kTail); diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java index a1dd9738ea..66aa029d1e 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java @@ -232,10 +232,5 @@ public class FileDescriptor { private static native int read(int fd, ByteBuffer buf, int pos, int limit); private static native int readAddress(int fd, long address, int pos, int limit); - //only temporary - public int getFd() { - return fd; - } - private static native long newPipe(); }