diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index 9f324fc654..23036a1d30 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -471,18 +471,6 @@ public final class PlatformDependent { return new ConcurrentHashMap(map); } - public static void loadFence() { - PlatformDependent0.loadFence(); - } - - public static void storeFence() { - PlatformDependent0.storeFence(); - } - - public static void fullFence() { - PlatformDependent0.fullFence(); - } - /** * Try to deallocate the specified direct {@link ByteBuffer}. Please note this method does nothing if * the current platform does not support this operation or the specified buffer is not a direct buffer. @@ -523,6 +511,14 @@ public final class PlatformDependent { return PlatformDependent0.getInt(address); } + public static int getIntVolatalile(long address) { + return PlatformDependent0.getIntVolatile(address); + } + + public static void putIntOrdered(long adddress, int newValue) { + PlatformDependent0.putIntOrdered(adddress, newValue); + } + public static long getLong(long address) { return PlatformDependent0.getLong(address); } diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java index 662b6c92bc..43c4030c12 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java @@ -529,6 +529,14 @@ final class PlatformDependent0 { return UNSAFE.getInt(address); } + static int getIntVolatile(long address) { + return UNSAFE.getIntVolatile(null, address); + } + + static void putIntOrdered(long adddress, int newValue) { + UNSAFE.putOrderedInt(null, adddress, newValue); + } + static long getLong(long address) { return UNSAFE.getLong(address); } diff --git a/example/pom.xml b/example/pom.xml index 708897cf9e..c0b20488d1 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -43,6 +43,13 @@ netty-buffer ${project.version} + + + ${project.groupId} + netty-transport-native-io_uring + ${project.version} + linux-x86_64 + ${project.groupId} netty-transport diff --git a/example/src/main/java/io/netty/example/uring/EchoIOUringServer.java b/example/src/main/java/io/netty/example/uring/EchoIOUringServer.java new file mode 100644 index 0000000000..23afca30d3 --- /dev/null +++ b/example/src/main/java/io/netty/example/uring/EchoIOUringServer.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.uring; + +import io.netty.bootstrap.ServerBootstrap; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.uring.IOUringEventLoopGroup; +import io.netty.channel.uring.IOUringServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +//temporary prototype example +public class EchoIOUringServer { + private static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); + + public static void main(String []args) { + EventLoopGroup bossGroup = new IOUringEventLoopGroup(1); + EventLoopGroup workerGroup = new IOUringEventLoopGroup(1); + final EchoIOUringServerHandler serverHandler = new EchoIOUringServerHandler(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(IOUringServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + //p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(serverHandler); + } + }); + + // Start the server. + ChannelFuture f = b.bind(PORT).sync(); + + // Wait until the server socket is closed. + f.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + // Shut down all event loops to terminate all threads. + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } +} diff --git a/example/src/main/java/io/netty/example/uring/EchoIOUringServerHandler.java b/example/src/main/java/io/netty/example/uring/EchoIOUringServerHandler.java new file mode 100644 index 0000000000..5828df91fe --- /dev/null +++ b/example/src/main/java/io/netty/example/uring/EchoIOUringServerHandler.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.uring; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; + +//temporary prototype example +@Sharable +public class EchoIOUringServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + cause.printStackTrace(); + ctx.close(); + } + +} diff --git a/run-example.sh b/run-example.sh index 3159bff2ec..60a8dd34cb 100755 --- a/run-example.sh +++ b/run-example.sh @@ -1,5 +1,6 @@ #!/bin/bash -e EXAMPLE_MAP=( + 'uring:io.netty.example.uring.EchoIOUringServer' 'discard-client:io.netty.example.discard.DiscardClient' 'discard-server:io.netty.example.discard.DiscardServer' 'echo-client:io.netty.example.echo.EchoClient' diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index c17c4b1e41..42c94466ee 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -565,7 +565,7 @@ static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) { } memset(dynamicMethods, 0, size); memcpy(dynamicMethods, fixed_method_table, sizeof(fixed_method_table)); - + JNINativeMethod* dynamicMethod = &dynamicMethods[fixed_method_table_size]; NETTY_PREPEND(packagePrefix, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket;II)I", dynamicTypeName, error); NETTY_PREPEND("(IZ[L", dynamicTypeName, dynamicMethod->signature, error); diff --git a/transport-native-io_uring/src/main/c/io_uring.h b/transport-native-io_uring/src/main/c/io_uring.h index fba84d5ff2..5a322d05c7 100644 --- a/transport-native-io_uring/src/main/c/io_uring.h +++ b/transport-native-io_uring/src/main/c/io_uring.h @@ -1,9 +1,27 @@ -/* SPDX-License-Identifier: MIT */ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ #include #include #include #include - +#include "netty_unix_errors.h" +#include "netty_unix_filedescriptor.h" +#include "netty_unix_jni.h" +#include "netty_unix_socket.h" +#include "netty_unix_util.h" #ifndef LIB_TEST #define LIB_TEST diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_native.c b/transport-native-io_uring/src/main/c/netty_io_uring_native.c index b1af413587..366235a2e6 100644 --- a/transport-native-io_uring/src/main/c/netty_io_uring_native.c +++ b/transport-native-io_uring/src/main/c/netty_io_uring_native.c @@ -1,10 +1,20 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ #define _GNU_SOURCE // RTLD_DEFAULT #include "io_uring.h" -#include "netty_unix_errors.h" -#include "netty_unix_filedescriptor.h" -#include "netty_unix_jni.h" -#include "netty_unix_socket.h" -#include "netty_unix_util.h" #include #include #include diff --git a/transport-native-io_uring/src/main/c/syscall.c b/transport-native-io_uring/src/main/c/syscall.c index 0d0871b19c..e9b8e1289a 100644 --- a/transport-native-io_uring/src/main/c/syscall.c +++ b/transport-native-io_uring/src/main/c/syscall.c @@ -1,6 +1,17 @@ -/* SPDX-License-Identifier: MIT */ /* - * Will go away once libc support is there + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. */ #include "syscall.h" #include diff --git a/transport-native-io_uring/src/main/c/syscall.h b/transport-native-io_uring/src/main/c/syscall.h index ffef0b95e2..c3562b0824 100644 --- a/transport-native-io_uring/src/main/c/syscall.h +++ b/transport-native-io_uring/src/main/c/syscall.h @@ -1,4 +1,18 @@ -/* SPDX-License-Identifier: MIT */ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ #include #include #ifndef LIBURING_SYSCALL_H diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index ada5b73ce9..3057a1d751 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -24,8 +24,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.UnixChannel; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.ReferenceCountUtil; @@ -36,17 +38,26 @@ import java.nio.channels.UnresolvedAddressException; import static io.netty.util.internal.ObjectUtil.*; -public abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel { - private volatile SocketAddress local; +abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); final LinuxSocket socket; protected volatile boolean active; boolean uringInReadyPending; + private volatile SocketAddress local; + private volatile SocketAddress remote; + AbstractIOUringChannel(final Channel parent, LinuxSocket fd) { super(parent); this.socket = checkNotNull(fd, "fd"); this.active = true; + + if (active) { + // Directly cache the remote and local addresses + // See https://github.com/netty/netty/issues/2359 + this.local = fd.localAddress(); + this.remote = fd.remoteAddress(); + } } public boolean isOpen() { @@ -63,6 +74,11 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements return METADATA; } + @Override + public FileDescriptor fd() { + return socket; + } + @Override protected abstract AbstractUringUnsafe newUnsafe(); @@ -86,6 +102,8 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements event.setAbstractIOUringChannel(this); submissionQueue.add(eventId, EventType.READ, socket.getFd(), byteBuf.memoryAddress(), byteBuf.writerIndex(), byteBuf.capacity()); + ioUringEventLoop.addNewEvent(event); + submissionQueue.submit(); } } @@ -128,6 +146,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements @Override protected void doClose() throws Exception { + socket.close(); } // Channel/ChannelHandlerContext.read() was called @@ -140,9 +159,15 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements } } + public void executeReadEvent() { + final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe(); + unsafe.executeUringReadOperator(); + } + @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { - if (in.size() == 1) { + //Todo write until there is nothing left in the buffer + if (in.size() >= 1) { Object msg = in.current(); if (msg instanceof ByteBuf) { doWriteBytes((ByteBuf) msg); @@ -150,7 +175,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements } } - protected final void doWriteBytes(ByteBuf buf) throws Exception { + protected final void doWriteBytes(ByteBuf buf) { if (buf.hasMemoryAddress()) { IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); @@ -161,6 +186,8 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements event.setAbstractIOUringChannel(this); submissionQueue.add(eventId, EventType.WRITE, socket.getFd(), buf.memoryAddress(), buf.readerIndex(), buf.writerIndex()); + ioUringEventLoop.addNewEvent(event); + submissionQueue.submit(); } } @@ -174,26 +201,23 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements } }; - /** - * Create a new {@link } instance. - * - * @param handle The handle to wrap with EPOLL specific logic. - */ - IOUringRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) { + IOUringRecvByteAllocatorHandle newIOUringHandle(RecvByteBufAllocator.ExtendedHandle handle) { return new IOUringRecvByteAllocatorHandle(handle); } @Override public IOUringRecvByteAllocatorHandle recvBufAllocHandle() { if (allocHandle == null) { - allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle()); + allocHandle = newIOUringHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle()); } return allocHandle; } + //Todo @Override public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + promise.setFailure(new Exception()); } final void executeUringReadOperator() { @@ -217,7 +241,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements } @Override - public void doBind(final SocketAddress localAddress) throws Exception { + public void doBind(final SocketAddress local) throws Exception { if (local instanceof InetSocketAddress) { checkResolvable((InetSocketAddress) local); } @@ -236,15 +260,15 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements @Override protected SocketAddress localAddress0() { - return null; + return local; } @Override protected SocketAddress remoteAddress0() { - return null; + return remote; } public LinuxSocket getSocket() { return socket; } -} \ No newline at end of file +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 9433dcb3a0..f320ffff56 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -16,19 +16,15 @@ package io.netty.channel.uring; import io.netty.channel.Channel; -import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.ServerChannel; -import io.netty.channel.unix.FileDescriptor; import java.net.SocketAddress; -public abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel { +abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel { - private volatile SocketAddress local; - - AbstractIOUringServerChannel(int fd) { + AbstractIOUringServerChannel(int fd) { super(null, new LinuxSocket(fd)); } @@ -41,31 +37,16 @@ public abstract class AbstractIOUringServerChannel extends AbstractIOUringChanne return new UringServerChannelUnsafe(); } - @Override - protected SocketAddress localAddress0() { - return null; - } - - @Override - protected SocketAddress remoteAddress0() { - return null; - } - @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } - @Override - public FileDescriptor fd() { - return null; - } - public AbstractIOUringChannel getChannel() { return this; } - abstract Channel newChildChannel(int fd, IOUringSubmissionQueue submissionQueue) throws Exception; + abstract Channel newChildChannel(int fd) throws Exception; final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { private final byte[] acceptedAddress = new byte[26]; @@ -87,9 +68,11 @@ public abstract class AbstractIOUringServerChannel extends AbstractIOUringChanne event.setOp(EventType.ACCEPT); event.setAbstractIOUringChannel(getChannel()); - //todo get network addresses + //Todo get network addresses submissionQueue.add(eventId, EventType.ACCEPT, getChannel().getSocket().getFd(), 0, 0, 0); ioUringEventLoop.addNewEvent(event); + submissionQueue.submit(); } } -} \ No newline at end of file +} + diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java index c4df4d0c9c..eba1f9fe5c 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java @@ -17,7 +17,7 @@ package io.netty.channel.uring; import io.netty.buffer.ByteBuf; -public class Event { +final class Event { private long id; private ByteBuf readBuffer; @@ -56,4 +56,3 @@ public class Event { this.op = op; } } - diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/EventType.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/EventType.java index e5eaba4073..b5bad66683 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/EventType.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/EventType.java @@ -15,7 +15,7 @@ */ package io.netty.channel.uring; -public enum EventType { +enum EventType { ACCEPT(13), READ(22), WRITE(23); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelConfig.java index 4d313ad6f4..ec7bacf8e3 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelConfig.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelConfig.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package io.netty.channel.uring; import io.netty.channel.Channel; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java index 57d17a3acb..9f8ab935f8 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java @@ -21,13 +21,13 @@ public class IOUringCompletionQueue { //these offsets are used to access specific properties //CQE (https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L162) - private final int CQE_USER_DATA_FIELD = 0; - private final int CQE_RES_FIELD = 8; - private final int CQE_FLAGS_FIELD = 12; + private static final int CQE_USER_DATA_FIELD = 0; + private static final int CQE_RES_FIELD = 8; + private static final int CQE_FLAGS_FIELD = 12; - private final int CQE_SIZE = 16; + private static final int CQE_SIZE = 16; - private final int IORING_ENTER_GETEVENTS = 1; + private static final int IORING_ENTER_GETEVENTS = 1; //these unsigned integer pointers(shared with the kernel) will be changed by the kernel private final long kHeadAddress; @@ -55,12 +55,10 @@ public class IOUringCompletionQueue { this.ringFd = ringFd; } - private IOUringCqe peek() { + public IOUringCqe peek() { long cqe = 0; - long head = toUnsignedLong(PlatformDependent.getInt(kHeadAddress)); + long head = toUnsignedLong(PlatformDependent.getIntVolatalile(kHeadAddress)); - //aquire memory barrier https://openjdk.java.net/jeps/171 - PlatformDependent.loadFence(); if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) { long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress)); cqe = index * CQE_SIZE + completionQueueArrayAddress; @@ -70,8 +68,7 @@ public class IOUringCompletionQueue { long flags = toUnsignedLong(PlatformDependent.getInt(cqe + CQE_FLAGS_FIELD)); //Ensure that the kernel only sees the new value of the head index after the CQEs have been read. - PlatformDependent.storeFence(); - PlatformDependent.putInt(kHeadAddress, (int) (head + 1)); + PlatformDependent.putIntOrdered(kHeadAddress, (int) (head + 1)); return new IOUringCqe(eventId, res, flags); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCqe.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCqe.java index a39351a578..3598cd38d1 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCqe.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCqe.java @@ -15,12 +15,12 @@ */ package io.netty.channel.uring; -public class IOUringCqe { +class IOUringCqe { private final long eventId; private final int res; private final long flags; - public IOUringCqe(long eventId, int res, long flags) { + IOUringCqe(long eventId, int res, long flags) { this.eventId = eventId; this.res = res; this.flags = flags; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 2b281def5c..1e0ab5bec7 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -15,23 +15,22 @@ */ package io.netty.channel.uring; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SingleThreadEventLoop; -import io.netty.util.collection.IntObjectHashMap; -import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.LongObjectHashMap; -import io.netty.util.concurrent.RejectedExecutionHandler; -import java.util.HashMap; import java.util.concurrent.Executor; +import static io.netty.channel.unix.Errors.*; + class IOUringEventLoop extends SingleThreadEventLoop { - private final IntObjectMap channels = new IntObjectHashMap(4096); // events should be unique to identify which event type that was private long eventIdCounter; private final LongObjectHashMap events = new LongObjectHashMap(); - private final RingBuffer ringBuffer; + private RingBuffer ringBuffer; protected IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { super(parent, executor, addTaskWakesUp); @@ -40,6 +39,7 @@ class IOUringEventLoop extends SingleThreadEventLoop { public long incrementEventIdCounter() { long eventId = eventIdCounter; + System.out.println(" incrementEventIdCounter EventId: " + eventId); eventIdCounter++; return eventId; } @@ -50,26 +50,104 @@ class IOUringEventLoop extends SingleThreadEventLoop { @Override protected void run() { - //Todo for (;;) { - // wait until an event has finished - //final long cqe = Native.ioUringWaitCqe(io_uring); - //final Event event = events.get(Native.ioUringGetEventId(cqe)); - //final int ret = Native.ioUringGetRes(cqe); - // switch (event.getOp()) { - // case ACCEPT: - // // serverChannel is necessary to call newChildchannel - // // create a new accept event - // break; - // case READ: - // // need to save the Bytebuf before I execute the read operation - // // fireChannelRead(byteBuf) - // break; - // case WRITE: - // // you have to store Bytebuf to continue writing - // break; - // } - // processing Tasks + final IOUringCompletionQueue ioUringCompletionQueue = ringBuffer.getIoUringCompletionQueue(); + final IOUringCqe ioUringCqe = ioUringCompletionQueue.peek(); // or waiting + + if (ioUringCqe != null) { + final Event event = events.get(ioUringCqe.getEventId()); + System.out.println("Completion EventId: " + ioUringCqe.getEventId()); + + if (event != null) { + switch (event.getOp()) { + case ACCEPT: + System.out.println("EventLoop Accept Res: " + ioUringCqe.getRes()); + if (ioUringCqe.getRes() != -1 && ioUringCqe.getRes() != ERRNO_EAGAIN_NEGATIVE && + ioUringCqe.getRes() != ERRNO_EWOULDBLOCK_NEGATIVE) { + AbstractIOUringServerChannel abstractIOUringServerChannel = + (AbstractIOUringServerChannel) event.getAbstractIOUringChannel(); + System.out.println("EventLoop Fd: " + abstractIOUringServerChannel.getSocket().getFd()); + final IOUringRecvByteAllocatorHandle allocHandle = + (IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe() + .recvBufAllocHandle(); + final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline(); + + allocHandle.lastBytesRead(ioUringCqe.getRes()); + if (allocHandle.lastBytesRead() != -1) { + allocHandle.incMessagesRead(1); + try { + pipeline.fireChannelRead(abstractIOUringServerChannel + .newChildChannel(allocHandle.lastBytesRead())); + } catch (Exception e) { + e.printStackTrace(); + } + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + } + } + long eventId = incrementEventIdCounter(); + event.setId(eventId); + ringBuffer.getIoUringSubmissionQueue() + .add(eventId, EventType.ACCEPT, event.getAbstractIOUringChannel().getSocket().getFd(), + 0, + 0, + 0); + addNewEvent(event); + ringBuffer.getIoUringSubmissionQueue().submit(); + break; + case READ: + System.out.println("Eventlloop Read Res: " + ioUringCqe.getRes()); + System.out.println("Eventloop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd()); + ByteBuf byteBuf = event.getReadBuffer(); + int localReadAmount = ioUringCqe.getRes(); + if (localReadAmount > 0) { + byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); + } + + final IOUringRecvByteAllocatorHandle allocHandle = + (IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe() + .recvBufAllocHandle(); + final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline(); + + allocHandle.lastBytesRead(localReadAmount); + if (allocHandle.lastBytesRead() <= 0) { + // nothing was read, release the buffer. + byteBuf.release(); + byteBuf = null; + break; + } + + allocHandle.incMessagesRead(1); + //readPending = false; + pipeline.fireChannelRead(byteBuf); + byteBuf = null; + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + event.getAbstractIOUringChannel().executeReadEvent(); + break; + case WRITE: + System.out.println("Eventloop Write Res: " + ioUringCqe.getRes()); + System.out.println("Eventloop Fd: " + event.getAbstractIOUringChannel().getSocket().getFd()); + //remove bytes + int localFlushAmount = ioUringCqe.getRes(); + if (localFlushAmount > 0) { + event.getAbstractIOUringChannel().unsafe().outboundBuffer().removeBytes(localFlushAmount); + } + break; + } + } else { + System.out.println("Event is null!!!! "); + } + } + //run tasks + if (hasTasks()) { + runAllTasks(); + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java index 43720a1032..1b7d01ed7d 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java @@ -27,31 +27,41 @@ import io.netty.util.concurrent.RejectedExecutionHandlers; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; -public class IOUringEventLoopGroup extends MultithreadEventLoopGroup { - +public final class IOUringEventLoopGroup extends MultithreadEventLoopGroup { + /** + * Create a new instance using the default number of threads and the default {@link ThreadFactory}. + */ public IOUringEventLoopGroup() { this(0); } - + /** + * Create a new instance using the specified number of threads and the default {@link ThreadFactory}. + */ public IOUringEventLoopGroup(int nThreads) { this(nThreads, (ThreadFactory) null); } - + /** + * Create a new instance using the default number of threads and the given {@link ThreadFactory}. + */ @SuppressWarnings("deprecation") public IOUringEventLoopGroup(ThreadFactory threadFactory) { this(0, threadFactory, 0); } - + /** + * Create a new instance using the specified number of threads and the default {@link ThreadFactory}. + */ @SuppressWarnings("deprecation") public IOUringEventLoopGroup(int nThreads, SelectStrategyFactory selectStrategyFactory) { this(nThreads, (ThreadFactory) null, selectStrategyFactory); } - + /** + * Create a new instance using the specified number of threads and the given {@link ThreadFactory}. + */ @SuppressWarnings("deprecation") public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, 0); @@ -61,17 +71,33 @@ public class IOUringEventLoopGroup extends MultithreadEventLoopGroup { this(nThreads, executor, DefaultSelectStrategyFactory.INSTANCE); } + /** + * Create a new instance using the specified number of threads and the given {@link ThreadFactory}. + */ @SuppressWarnings("deprecation") public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectStrategyFactory selectStrategyFactory) { this(nThreads, threadFactory, 0, selectStrategyFactory); } + /** + * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given + * maximal amount of epoll events to handle per epollWait(...). + * + * @deprecated Use {@link #IOUringEventLoopGroup(int)} or {@link #IOUringEventLoopGroup(int, ThreadFactory)} + */ @Deprecated public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) { this(nThreads, threadFactory, maxEventsAtOnce, DefaultSelectStrategyFactory.INSTANCE); } + /** + * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given + * maximal amount of epoll events to handle per epollWait(...). + * + * @deprecated Use {@link #IOUringEventLoopGroup(int)}, {@link #IOUringEventLoopGroup(int, ThreadFactory)}, or + * {@link #IOUringEventLoopGroup(int, SelectStrategyFactory)} + */ @Deprecated public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce, SelectStrategyFactory selectStrategyFactory) { @@ -101,6 +127,9 @@ public class IOUringEventLoopGroup extends MultithreadEventLoopGroup { selectStrategyFactory, rejectedExecutionHandler, queueFactory); } + /** + * @deprecated This method will be removed in future releases, and is not guaranteed to have any impacts. + */ @Deprecated public void setIoRatio(int ioRatio) { if (ioRatio <= 0 || ioRatio > 100) { @@ -108,9 +137,14 @@ public class IOUringEventLoopGroup extends MultithreadEventLoopGroup { } } + //Todo @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { - //EventLoopTaskQueueFactory queueFactory = args.length == 4? (EventLoopTaskQueueFactory) args[3] : null; + //EventLoopTaskQueueFactory queueFactory = args.length == 4? (EventLoopTaskQueueFactory) args[3] : null; +// return new IOUringEventLoop(this, executor, (Integer) args[0], +// ((SelectStrategyFactory) args[1]).newSelectStrategy(), +// (RejectedExecutionHandler) args[2], queueFactory); + return new IOUringEventLoop(this, executor, false); } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringRecvByteAllocatorHandle.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringRecvByteAllocatorHandle.java index 09aebffac0..528225aeff 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringRecvByteAllocatorHandle.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringRecvByteAllocatorHandle.java @@ -21,7 +21,7 @@ import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.unix.PreferredDirectByteBufAllocator; import io.netty.util.UncheckedBooleanSupplier; -public class IOUringRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle +final class IOUringRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle implements RecvByteBufAllocator.ExtendedHandle { private final PreferredDirectByteBufAllocator preferredDirectByteBufAllocator = new PreferredDirectByteBufAllocator(); @@ -37,19 +37,19 @@ public class IOUringRecvByteAllocatorHandle extends RecvByteBufAllocator.Delegat } @Override - public final ByteBuf allocate(ByteBufAllocator alloc) { + public ByteBuf allocate(ByteBufAllocator alloc) { // We need to ensure we always allocate a direct ByteBuf as we can only use a direct buffer to read via JNI. preferredDirectByteBufAllocator.updateAllocator(alloc); return delegate().allocate(preferredDirectByteBufAllocator); } @Override - public final boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { + public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { return ((RecvByteBufAllocator.ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier); } @Override - public final boolean continueReading() { + public boolean continueReading() { return false; } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerChannelConfig.java index 4aebe8ff64..1b4bbbeb31 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerChannelConfig.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerChannelConfig.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package io.netty.channel.uring; import io.netty.buffer.ByteBufAllocator; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java index d83332278d..e513a3a64f 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.uring; import io.netty.channel.Channel; import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; import java.net.InetSocketAddress; @@ -26,29 +27,17 @@ public class IOUringServerSocketChannel extends AbstractIOUringServerChannel imp private final IOUringServerSocketChannelConfig config; public IOUringServerSocketChannel() { - super(Socket.newSocketStream().getFd()); + super(Socket.newSocketStreamBlocking().getFd()); this.config = new IOUringServerSocketChannelConfig(this); } - - @Override - public void doBind(SocketAddress localAddress) throws Exception { - super.doBind(localAddress); - } - @Override public IOUringServerSocketChannelConfig config() { return config; } @Override - public boolean isOpen() { - return false; - } - - - @Override - Channel newChildChannel(int fd, IOUringSubmissionQueue submissionQueue) throws Exception { + Channel newChildChannel(int fd) throws Exception { return new IOUringSocketChannel(this, new LinuxSocket(fd)); } @@ -66,4 +55,16 @@ public class IOUringServerSocketChannel extends AbstractIOUringServerChannel imp public InetSocketAddress localAddress() { return (InetSocketAddress) super.localAddress(); } -} \ No newline at end of file + + @Override + public void doBind(SocketAddress localAddress) throws Exception { + super.doBind(localAddress); + socket.listen(500); + active = true; + } + + @Override + public FileDescriptor fd() { + return super.fd(); + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java index fc208dec66..18f2374518 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package io.netty.channel.uring; import io.netty.channel.socket.ServerSocketChannelConfig; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java index 5d84142331..d71808a17e 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java @@ -21,7 +21,9 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelConfig; import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannelConfig; @@ -43,11 +45,6 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock return (ServerSocketChannel) super.parent(); } - @Override - public IOUringSocketChannelConfig config() { - return config; - } - @Override protected AbstractUringUnsafe newUnsafe() { return new AbstractUringUnsafe() { @@ -67,7 +64,8 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock } @Override - public void doBind(SocketAddress localAddress) throws Exception { + public IOUringSocketChannelConfig config() { + return config; } @Override @@ -117,17 +115,17 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock @Override public FileDescriptor fd() { - return null; + return super.fd(); } @Override protected SocketAddress localAddress0() { - return null; + return super.localAddress0(); } @Override protected SocketAddress remoteAddress0() { - return null; + return super.remoteAddress0(); } @Override diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java index 976e444cd3..e6dbe326b1 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package io.netty.channel.uring; import io.netty.buffer.ByteBufAllocator; @@ -20,7 +35,6 @@ public class IOUringSocketChannelConfig extends IOUringChannelConfig implements super(channel); } - @Override public int getReceiveBufferSize() { try { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 67f954ae4f..b9c122739e 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -19,203 +19,208 @@ import io.netty.util.internal.PlatformDependent; public class IOUringSubmissionQueue { - private final int SQE_SIZE = 64; - private final int INT_SIZE = 4; + private static final int SQE_SIZE = 64; + private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support? - //these offsets are used to access specific properties - //SQE https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L21 - private final int SQE_OP_CODE_FIELD = 0; - private final int SQE_FLAGS_FIELD = 1; - private final int SQE_IOPRIO_FIELD = 2; // u16 - private final int SQE_FD_FIELD = 4; // s32 - private final int SQE_OFFSET_FIELD = 8; - private final int SQE_ADDRESS_FIELD = 16; - private final int SQE_LEN_FIELD = 24; - private final int SQE_RW_FLAGS_FIELD = 28; - private final int SQE_USER_DATA_FIELD = 32; - private final int SQE_PAD_FIELD = 40; + //these offsets are used to access specific properties + //SQE https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L21 + private static final int SQE_OP_CODE_FIELD = 0; + private static final int SQE_FLAGS_FIELD = 1; + private static final int SQE_IOPRIO_FIELD = 2; // u16 + private static final int SQE_FD_FIELD = 4; // s32 + private static final int SQE_OFFSET_FIELD = 8; + private static final int SQE_ADDRESS_FIELD = 16; + private static final int SQE_LEN_FIELD = 24; + private static final int SQE_RW_FLAGS_FIELD = 28; + private static final int SQE_USER_DATA_FIELD = 32; + private static final int SQE_PAD_FIELD = 40; - //these unsigned integer pointers(shared with the kernel) will be changed by the kernel - private final long kHeadAddress; - private final long kTailAddress; - private final long kRingMaskAddress; - private final long kRingEntriesAddress; - private final long fFlagsAdress; - private final long kDroppedAddress; - private final long arrayAddress; + //these unsigned integer pointers(shared with the kernel) will be changed by the kernel + private final long kHeadAddress; + private final long kTailAddress; + private final long kRingMaskAddress; + private final long kRingEntriesAddress; + private final long fFlagsAdress; + private final long kDroppedAddress; + private final long arrayAddress; - private final long submissionQueueArrayAddress; + private final long submissionQueueArrayAddress; - private long sqeHead; - private long sqeTail; + private long sqeHead; + private long sqeTail; - private final int ringSize; - private final long ringAddress; - private final int ringFd; + private final int ringSize; + private final long ringAddress; + private final int ringFd; - public IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress, - long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress, int ringSize, - long ringAddress, int ringFd) { - this.kHeadAddress = kHeadAddress; - this.kTailAddress = kTailAddress; - this.kRingMaskAddress = kRingMaskAddress; - this.kRingEntriesAddress = kRingEntriesAddress; - this.fFlagsAdress = fFlagsAdress; - this.kDroppedAddress = kDroppedAddress; - this.arrayAddress = arrayAddress; - this.submissionQueueArrayAddress = submissionQueueArrayAddress; - this.ringSize = ringSize; - this.ringAddress = ringAddress; - this.ringFd = ringFd; - } - - public long getSqe() { - long next = sqeTail + 1; - long kRingEntries = toUnsignedLong(PlatformDependent.getInt(kRingEntriesAddress)); - long sqe = 0; - if ((next - sqeHead) <= kRingEntries) { - long index = sqeTail & toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress)); - sqe = SQE_SIZE * index + submissionQueueArrayAddress; - sqeTail = next; - } - return sqe; - } - - private void setData(long sqe, long eventId, EventType type, int fd, long bufferAddress, int length, long offset) { - //Todo cleaner - //set sqe(submission queue) properties - PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) type.getOp()); - PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); - PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0); - PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd); - PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset); - PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress); - PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length); - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); - PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId); - - // pad field array -> all fields should be zero - long offsetIndex = 0; - for (int i = 0; i < 3; i++) { - PlatformDependent.putLong(sqe + SQE_PAD_FIELD + offsetIndex, 0); - offsetIndex += 8; + public IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress, + long fFlagsAdress, long kDroppedAddress, long arrayAddress, + long submissionQueueArrayAddress, int ringSize, + long ringAddress, int ringFd) { + this.kHeadAddress = kHeadAddress; + this.kTailAddress = kTailAddress; + this.kRingMaskAddress = kRingMaskAddress; + this.kRingEntriesAddress = kRingEntriesAddress; + this.fFlagsAdress = fFlagsAdress; + this.kDroppedAddress = kDroppedAddress; + this.arrayAddress = arrayAddress; + this.submissionQueueArrayAddress = submissionQueueArrayAddress; + this.ringSize = ringSize; + this.ringAddress = ringAddress; + this.ringFd = ringFd; } - System.out.println("OPField: " + PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD)); - System.out.println("UserDataField: " + PlatformDependent.getByte(sqe + SQE_USER_DATA_FIELD)); - } - - public boolean add(long eventId, EventType type, int fd, long bufferAddress, int pos, int limit) { - long sqe = getSqe(); - if (sqe == 0) { - return false; - } - setData(sqe, eventId, type, fd, bufferAddress + pos, limit - pos, 0); - return true; - } - - private int flushSqe() { - long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress)); - long kHead = toUnsignedLong(PlatformDependent.getInt(kHeadAddress)); - long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress)); - - System.out.println("Ktail: " + kTail); - System.out.println("Ktail: " + kHead); - System.out.println("SqeHead: " + sqeHead); - System.out.println("SqeTail: " + sqeTail); - - if (sqeHead == sqeTail) { - return (int) (kTail - kHead); + public long getSqe() { + long next = sqeTail + 1; + long kRingEntries = toUnsignedLong(PlatformDependent.getInt(kRingEntriesAddress)); + long sqe = 0; + if ((next - sqeHead) <= kRingEntries) { + long index = sqeTail & toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress)); + sqe = SQE_SIZE * index + submissionQueueArrayAddress; + sqeTail = next; + } + return sqe; } - long toSubmit = sqeTail - sqeHead; - while (toSubmit > 0) { - long index = kTail & kRingMask; + private void setData(long sqe, long eventId, EventType type, int fd, long bufferAddress, int length, long offset) { + //Todo cleaner + //set sqe(submission queue) properties + PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) type.getOp()); + PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); + PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0); + PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd); + PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset); + PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress); + PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length); + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); + PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId); - PlatformDependent.putInt(arrayAddress + index * INT_SIZE, (int) (sqeHead & kRingMask)); + // pad field array -> all fields should be zero + long offsetIndex = 0; + for (int i = 0; i < 3; i++) { + PlatformDependent.putLong(sqe + SQE_PAD_FIELD + offsetIndex, 0); + offsetIndex += 8; + } - sqeHead++; - kTail++; - toSubmit--; + System.out.println("OPField: " + PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD)); + System.out.println("UserDataField: " + PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD)); + System.out.println("BufferAddress: " + PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD)); + System.out.println("Length: " + PlatformDependent.getInt(sqe + SQE_LEN_FIELD)); + System.out.println("Offset: " + PlatformDependent.getLong(sqe + SQE_OFFSET_FIELD)); } - //release memory barrier - PlatformDependent.storeFence(); - - PlatformDependent.putInt(kTailAddress, (int) kTail); - - return (int) (kTail - kHead); - } - - public void submit() { - int submitted = flushSqe(); - System.out.println("Submitted: " + submitted); - - int ret = Native.ioUringEnter(ringFd, submitted, 0, 0); - if (ret < 0) { - throw new RuntimeException("ioUringEnter syscall"); + //Todo ring buffer errors for example if submission queue is full + public boolean add(long eventId, EventType type, int fd, long bufferAddress, int pos, int limit) { + long sqe = getSqe(); + if (sqe == 0) { + return false; + } + System.out.println("fd " + fd); + System.out.println("BufferAddress + pos: " + (bufferAddress + pos)); + System.out.println("limit + pos " + (limit - pos)); + setData(sqe, eventId, type, fd, bufferAddress + pos, limit - pos, 0); + return true; } - } - public void setSqeHead(long sqeHead) { - this.sqeHead = sqeHead; - } + private int flushSqe() { + long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress)); + long kHead = toUnsignedLong(PlatformDependent.getIntVolatalile(kHeadAddress)); + long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress)); - public void setSqeTail(long sqeTail) { - this.sqeTail = sqeTail; - } + System.out.println("Ktail: " + kTail); + System.out.println("Ktail: " + kHead); + System.out.println("SqeHead: " + sqeHead); + System.out.println("SqeTail: " + sqeTail); - public long getKHeadAddress() { - return this.kHeadAddress; - } + if (sqeHead == sqeTail) { + return (int) (kTail - kHead); + } - public long getKTailAddress() { - return this.kTailAddress; - } + long toSubmit = sqeTail - sqeHead; + while (toSubmit > 0) { + long index = kTail & kRingMask; - public long getKRingMaskAddress() { - return this.kRingMaskAddress; - } + PlatformDependent.putInt(arrayAddress + index * INT_SIZE, (int) (sqeHead & kRingMask)); - public long getKRingEntriesAddress() { - return this.kRingEntriesAddress; - } + sqeHead++; + kTail++; + toSubmit--; + } - public long getFFlagsAdress() { - return this.fFlagsAdress; - } + PlatformDependent.putIntOrdered(kTailAddress, (int) kTail); - public long getKDroppedAddress() { - return this.kDroppedAddress; - } + return (int) (kTail - kHead); + } - public long getArrayAddress() { - return this.arrayAddress; - } + public void submit() { + int submitted = flushSqe(); + System.out.println("Submitted: " + submitted); - public long getSubmissionQueueArrayAddress() { - return this.submissionQueueArrayAddress; - } + int ret = Native.ioUringEnter(ringFd, submitted, 0, 0); + if (ret < 0) { + throw new RuntimeException("ioUringEnter syscall"); + } + } - public long getSqeHead() { - return this.sqeHead; - } + public void setSqeHead(long sqeHead) { + this.sqeHead = sqeHead; + } - public long getSqeTail() { - return this.sqeTail; - } + public void setSqeTail(long sqeTail) { + this.sqeTail = sqeTail; + } - public int getRingSize() { - return this.ringSize; - } + public long getKHeadAddress() { + return this.kHeadAddress; + } - public long getRingAddress() { - return this.ringAddress; - } + public long getKTailAddress() { + return this.kTailAddress; + } - //Todo Integer.toUnsignedLong -> maven checkstyle error - public static long toUnsignedLong(int x) { - return ((long) x) & 0xffffffffL; -} + public long getKRingMaskAddress() { + return this.kRingMaskAddress; + } + + public long getKRingEntriesAddress() { + return this.kRingEntriesAddress; + } + + public long getFFlagsAdress() { + return this.fFlagsAdress; + } + + public long getKDroppedAddress() { + return this.kDroppedAddress; + } + + public long getArrayAddress() { + return this.arrayAddress; + } + + public long getSubmissionQueueArrayAddress() { + return this.submissionQueueArrayAddress; + } + + public long getSqeHead() { + return this.sqeHead; + } + + public long getSqeTail() { + return this.sqeTail; + } + + public int getRingSize() { + return this.ringSize; + } + + public long getRingAddress() { + return this.ringAddress; + } + + //Todo Integer.toUnsignedLong -> maven checkstyle error + public static long toUnsignedLong(int x) { + return ((long) x) & 0xffffffffL; + } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java index 86a8336db7..a46262526a 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java @@ -25,18 +25,7 @@ public class LinuxSocket extends Socket { this.fd = fd; } - //Todo - - // public int readEvent(long ring, long eventId, long bufferAddress, int pos, int limit) { - // return Native.ioUringRead(ring, fd, eventId, bufferAddress, pos, limit); - // } - - // public int writeEvent(long ring, long eventId, long bufferAddress, int pos, int limit) { - // return Native.ioUringWrite(ring, fd, eventId, bufferAddress, pos, limit); - // } - - // public int acceptEvent(long ring, long eventId, byte[] addr) { - // return Native.ioUringAccept(ring, eventId, addr); - // } - + public int getFd() { + return fd; + } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index a22fb6c03a..37cf61f930 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -25,7 +25,6 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; -import java.nio.channels.Selector; import java.util.Locale; public final class Native { @@ -33,6 +32,7 @@ public final class Native { private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 32); static { loadNativeLibrary(); + Socket.initialize(); } public static RingBuffer createRingBuffer(int ringSize) { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java index 789c88da54..3d266698d8 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java @@ -15,13 +15,12 @@ */ package io.netty.channel.uring; -import io.netty.util.internal.PlatformDependent; -public class RingBuffer { +class RingBuffer { private final IOUringSubmissionQueue ioUringSubmissionQueue; private final IOUringCompletionQueue ioUringCompletionQueue; - public RingBuffer(IOUringSubmissionQueue ioUringSubmissionQueue, IOUringCompletionQueue ioUringCompletionQueue) { + RingBuffer(IOUringSubmissionQueue ioUringSubmissionQueue, IOUringCompletionQueue ioUringCompletionQueue) { this.ioUringSubmissionQueue = ioUringSubmissionQueue; this.ioUringCompletionQueue = ioUringCompletionQueue; } diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index a14a53dbee..b9da652418 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -15,6 +15,7 @@ */ package io.netty.channel.uring; +import io.netty.channel.unix.Socket; import org.junit.Test; import java.io.FileInputStream; diff --git a/transport-native-unix-common/src/main/c/netty_unix_socket.c b/transport-native-unix-common/src/main/c/netty_unix_socket.c index 3f04f4b174..f75818c22b 100644 --- a/transport-native-unix-common/src/main/c/netty_unix_socket.c +++ b/transport-native-unix-common/src/main/c/netty_unix_socket.c @@ -69,6 +69,11 @@ static int nettyNonBlockingSocket(int domain, int type, int protocol) { #endif } +//only temporary +static int nettyBlockingSocket(int domain, int type, int protocol) { + return socket(domain, type, protocol); +} + int netty_unix_socket_ipAddressLength(const struct sockaddr_storage* addr) { if (addr->ss_family == AF_INET) { return 4; @@ -613,6 +618,17 @@ static jint netty_unix_socket_newSocketStreamFd(JNIEnv* env, jclass clazz, jbool return _socket(env, clazz, domain, SOCK_STREAM); } +//only temporary +static jint netty_unix_socket_newSocketStreamFd_blocking(JNIEnv* env, jclass clazz, jboolean ipv6) { + int domain = ipv6 == JNI_TRUE ? AF_INET6 : AF_INET; + + int fd = nettyBlockingSocket(domain, SOCK_STREAM, 0); + if (fd == -1) { + return -errno; + } + return fd; +} + static jint netty_unix_socket_newSocketDomainFd(JNIEnv* env, jclass clazz) { int fd = nettyNonBlockingSocket(PF_UNIX, SOCK_STREAM, 0); if (fd == -1) { @@ -979,6 +995,7 @@ static const JNINativeMethod fixed_method_table[] = { { "localAddress", "(I)[B", (void *) netty_unix_socket_localAddress }, { "newSocketDgramFd", "(Z)I", (void *) netty_unix_socket_newSocketDgramFd }, { "newSocketStreamFd", "(Z)I", (void *) netty_unix_socket_newSocketStreamFd }, + { "newSocketStreamFdBlocking", "(Z)I", (void *) netty_unix_socket_newSocketStreamFd_blocking }, //temporary { "newSocketDomainFd", "()I", (void *) netty_unix_socket_newSocketDomainFd }, { "sendTo", "(IZLjava/nio/ByteBuffer;II[BII)I", (void *) netty_unix_socket_sendTo }, { "sendToAddress", "(IZJII[BII)I", (void *) netty_unix_socket_sendToAddress }, diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java index 10a68d4b34..117a207a67 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java @@ -397,6 +397,11 @@ public class Socket extends FileDescriptor { return new Socket(newSocketStream0()); } + public static Socket newSocketStreamBlocking() { + System.out.println("newSocketStreamBlocking"); + return new Socket(newSocketStreamBlocking(isIPv6Preferred())); + } + public static Socket newSocketDgram() { return new Socket(newSocketDgram0()); } @@ -423,6 +428,15 @@ public class Socket extends FileDescriptor { return res; } + //only temporary + protected static int newSocketStreamBlocking(boolean ipv6) { + int res = newSocketStreamFdBlocking(ipv6); + if (res < 0) { + throw new ChannelException(newIOException("newSocketStream", res)); + } + return res; + } + protected static int newSocketDgram0() { return newSocketDgram0(isIPv6Preferred()); } @@ -471,6 +485,7 @@ public class Socket extends FileDescriptor { private static native int sendFd(int socketFd, int fd); private static native int newSocketStreamFd(boolean ipv6); + private static native int newSocketStreamFdBlocking(boolean ipv6); private static native int newSocketDgramFd(boolean ipv6); private static native int newSocketDomainFd();