From 09a0b78a8134af792bb80acc38b0d15f7589ab46 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 23 Sep 2020 11:21:06 +0200 Subject: [PATCH] Add IOUringDatagramChannel and so also support UDP (#10588) Motivation: We can also support UDP / Datagram based on io_uring, so we should do it for maximal performance Modifications: - Add IOUringDatagramChannel - Add tests based on our transport testsuite for it Result: UDP / Datagram is supported via io_uring as well now --- .../src/main/c/netty_io_uring_native.c | 73 +++ .../channel/uring/AbstractIOUringChannel.java | 124 ++-- .../uring/AbstractIOUringServerChannel.java | 14 +- .../uring/AbstractIOUringStreamChannel.java | 24 + .../channel/uring/IOUringDatagramChannel.java | 550 ++++++++++++++++++ .../uring/IOUringDatagramChannelConfig.java | 466 +++++++++++++++ .../netty/channel/uring/IOUringEventLoop.java | 5 +- .../channel/uring/IOUringSubmissionQueue.java | 9 +- .../main/java/io/netty/channel/uring/Iov.java | 40 ++ .../java/io/netty/channel/uring/MsgHdr.java | 51 ++ .../java/io/netty/channel/uring/Native.java | 18 +- .../NativeStaticallyReferencedJniMethods.java | 15 + .../io/netty/channel/uring/SockaddrIn.java | 8 + .../IOUringDatagramConnectNotExistsTest.java | 30 + .../IOUringDatagramMulticastIPv6Test.java | 30 + .../uring/IOUringDatagramMulticastTest.java | 29 + .../IOUringDatagramUnicastIPv6MappedTest.java | 29 + .../uring/IOUringDatagramUnicastIPv6Test.java | 30 + .../uring/IOUringDatagramUnicastTest.java | 30 + .../uring/IOUringSocketTestPermutation.java | 47 ++ 20 files changed, 1551 insertions(+), 71 deletions(-) create mode 100644 transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannel.java create mode 100644 transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannelConfig.java create mode 100644 transport-native-io_uring/src/main/java/io/netty/channel/uring/Iov.java create mode 100644 transport-native-io_uring/src/main/java/io/netty/channel/uring/MsgHdr.java create mode 100644 transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramConnectNotExistsTest.java create mode 100644 transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramMulticastIPv6Test.java create mode 100644 transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramMulticastTest.java create mode 100644 transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastIPv6MappedTest.java create mode 100644 transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastIPv6Test.java create mode 100644 transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastTest.java diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_native.c b/transport-native-io_uring/src/main/c/netty_io_uring_native.c index b269520d36..736b333e27 100644 --- a/transport-native-io_uring/src/main/c/netty_io_uring_native.c +++ b/transport-native-io_uring/src/main/c/netty_io_uring_native.c @@ -323,6 +323,56 @@ static jint netty_io_uring_in6AddressOffsetofS6Addr(JNIEnv* env, jclass clazz) { return offsetof(struct in6_addr, s6_addr); } +static jint netty_io_uring_sizeofSockaddrStorage(JNIEnv* env, jclass clazz) { + return sizeof(struct sockaddr_storage); +} + +static jint netty_io_uring_sizeofSizeT(JNIEnv* env, jclass clazz) { + return sizeof(size_t); +} + +static jint netty_io_uring_sizeofIovec(JNIEnv* env, jclass clazz) { + return sizeof(struct iovec); +} + +static jint netty_io_uring_iovecOffsetofIovBase(JNIEnv* env, jclass clazz) { + return offsetof(struct iovec, iov_base); +} + +static jint netty_io_uring_iovecOffsetofIovLen(JNIEnv* env, jclass clazz) { + return offsetof(struct iovec, iov_len); +} + +static jint netty_io_uring_sizeofMsghdr(JNIEnv* env, jclass clazz) { + return sizeof(struct msghdr); +} + +static jint netty_io_uring_msghdrOffsetofMsgName(JNIEnv* env, jclass clazz) { + return offsetof(struct msghdr, msg_name); +} + +static jint netty_io_uring_msghdrOffsetofMsgNamelen(JNIEnv* env, jclass clazz) { + return offsetof(struct msghdr, msg_namelen); +} +static jint netty_io_uring_msghdrOffsetofMsgIov(JNIEnv* env, jclass clazz) { + return offsetof(struct msghdr, msg_iov); +} +static jint netty_io_uring_msghdrOffsetofMsgIovlen(JNIEnv* env, jclass clazz) { + return offsetof(struct msghdr, msg_iovlen); +} + +static jint netty_io_uring_msghdrOffsetofMsgControl(JNIEnv* env, jclass clazz) { + return offsetof(struct msghdr, msg_control); +} + +static jint netty_io_uring_msghdrOffsetofMsgControllen(JNIEnv* env, jclass clazz) { + return offsetof(struct msghdr, msg_controllen); +} + +static jint netty_io_uring_msghdrOffsetofMsgFlags(JNIEnv* env, jclass clazz) { + return offsetof(struct msghdr, msg_flags); +} + static jint netty_io_uring_etime(JNIEnv* env, jclass clazz) { return ETIME; } @@ -379,6 +429,14 @@ static jint netty_io_uring_ioringOpClose(JNIEnv* env, jclass clazz) { return IORING_OP_CLOSE; } +static jint netty_io_uring_ioringOpSendmsg(JNIEnv* env, jclass clazz) { + return IORING_OP_SENDMSG; +} + +static jint netty_io_uring_ioringOpRecvmsg(JNIEnv* env, jclass clazz) { + return IORING_OP_RECVMSG; +} + static jint netty_io_uring_ioringEnterGetevents(JNIEnv* env, jclass clazz) { return IORING_ENTER_GETEVENTS; } @@ -406,6 +464,19 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = { { "sockaddrIn6OffsetofSin6Addr", "()I", (void *) netty_io_uring_sockaddrIn6OffsetofSin6Addr }, { "sockaddrIn6OffsetofSin6ScopeId", "()I", (void *) netty_io_uring_sockaddrIn6OffsetofSin6ScopeId }, { "in6AddressOffsetofS6Addr", "()I", (void *) netty_io_uring_in6AddressOffsetofS6Addr }, + { "sizeofSockaddrStorage", "()I", (void *) netty_io_uring_sizeofSockaddrStorage }, + { "sizeofSizeT", "()I", (void *) netty_io_uring_sizeofSizeT }, + { "sizeofIovec", "()I", (void *) netty_io_uring_sizeofIovec }, + { "iovecOffsetofIovBase", "()I", (void *) netty_io_uring_iovecOffsetofIovBase }, + { "iovecOffsetofIovLen", "()I", (void *) netty_io_uring_iovecOffsetofIovLen }, + { "sizeofMsghdr", "()I", (void *) netty_io_uring_sizeofMsghdr }, + { "msghdrOffsetofMsgName", "()I", (void *) netty_io_uring_msghdrOffsetofMsgName }, + { "msghdrOffsetofMsgNamelen", "()I", (void *) netty_io_uring_msghdrOffsetofMsgNamelen }, + { "msghdrOffsetofMsgIov", "()I", (void *) netty_io_uring_msghdrOffsetofMsgIov }, + { "msghdrOffsetofMsgIovlen", "()I", (void *) netty_io_uring_msghdrOffsetofMsgIovlen }, + { "msghdrOffsetofMsgControl", "()I", (void *) netty_io_uring_msghdrOffsetofMsgControl }, + { "msghdrOffsetofMsgControllen", "()I", (void *) netty_io_uring_msghdrOffsetofMsgControllen }, + { "msghdrOffsetofMsgFlags", "()I", (void *) netty_io_uring_msghdrOffsetofMsgFlags }, { "etime", "()I", (void *) netty_io_uring_etime }, { "ecanceled", "()I", (void *) netty_io_uring_ecanceled }, { "pollin", "()I", (void *) netty_io_uring_pollin }, @@ -420,6 +491,8 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = { { "ioringOpWrite", "()I", (void *) netty_io_uring_ioringOpWrite }, { "ioringOpConnect", "()I", (void *) netty_io_uring_ioringOpConnect }, { "ioringOpClose", "()I", (void *) netty_io_uring_ioringOpClose }, + { "ioringOpSendmsg", "()I", (void *) netty_io_uring_ioringOpSendmsg }, + { "ioringOpRecvmsg", "()I", (void *) netty_io_uring_ioringOpRecvmsg }, { "ioringEnterGetevents", "()I", (void *) netty_io_uring_ioringEnterGetevents }, { "iosqeAsync", "()I", (void *) netty_io_uring_iosqeAsync } }; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index fbd4adce43..92a38ce4bc 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.uring; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufHolder; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.AbstractChannel; @@ -29,7 +30,6 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromiseNotifier; import io.netty.channel.ConnectTimeoutException; -import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; @@ -38,9 +38,6 @@ import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.unix.Buffer; import io.netty.channel.unix.Errors; import io.netty.channel.unix.FileDescriptor; -import io.netty.channel.unix.IovArray; -import io.netty.channel.unix.NativeInetAddress; -import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.ReferenceCountUtil; @@ -48,8 +45,6 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; -import java.net.Inet6Address; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -82,7 +77,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha private ChannelPromise delayedClose; private boolean inputClosedSeenErrorOnRead; - static final int SOCK_ADDR_LEN = 128; /** * The future of the current connection attempt. If not null, subsequent connection attempts will fail. @@ -257,8 +251,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } - //deregister - // Channel/ChannelHandlerContext.read() was called @Override protected void doBeginRead() { if ((ioState & POLL_IN_SCHEDULED) == 0) { @@ -286,39 +278,21 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha if (msgCount == 0) { return; } - ByteBuf msg = (ByteBuf) in.current(); - if (msgCount > 1 || - // We also need some special handling for CompositeByteBuf - msg.nioBufferCount() > 1) { - doWriteMultiple(in); - } else if (msgCount == 1) { - doWriteSingle(msg); - } - } + Object msg = in.current(); - private void doWriteMultiple(ChannelOutboundBuffer in) { - final IovArray iovecArray = ((IOUringEventLoop) eventLoop()).iovArray(); - try { - int offset = iovecArray.count(); - in.forEachFlushedMessage(iovecArray); - submissionQueue().addWritev(socket.intValue(), - iovecArray.memoryAddress(offset), iovecArray.count() - offset); - ioState |= WRITE_SCHEDULED; - } catch (Exception e) { - // This should never happen, anyway fallback to single write. - doWriteSingle((ByteBuf) in.current()); - } - } - - protected final void doWriteSingle(ByteBuf buf) { assert (ioState & WRITE_SCHEDULED) == 0; - IOUringSubmissionQueue submissionQueue = submissionQueue(); - submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), - buf.writerIndex()); + if (msgCount > 1) { + ioUringUnsafe().scheduleWriteMultiple(in); + } else if ((msg instanceof ByteBuf) && ((ByteBuf) msg).nioBufferCount() > 1 || + ((msg instanceof ByteBufHolder) && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) { + // We also need some special handling for CompositeByteBuf + ioUringUnsafe().scheduleWriteMultiple(in); + } else { + ioUringUnsafe().scheduleWriteSingle(msg); + } ioState |= WRITE_SCHEDULED; } - //POLLOUT private void schedulePollOut() { assert (ioState & POLL_OUT_SCHEDULED) == 0; IOUringSubmissionQueue submissionQueue = submissionQueue(); @@ -326,16 +300,31 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha ioState |= POLL_OUT_SCHEDULED; } - void schedulePollRdHup() { + final void schedulePollRdHup() { assert (ioState & POLL_RDHUP_SCHEDULED) == 0; IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollRdHup(fd().intValue()); ioState |= POLL_RDHUP_SCHEDULED; } + final void resetCachedAddresses() { + local = socket.localAddress(); + remote = socket.remoteAddress(); + } + abstract class AbstractUringUnsafe extends AbstractUnsafe { private IOUringRecvByteAllocatorHandle allocHandle; + /** + * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer}. + */ + protected abstract void scheduleWriteMultiple(ChannelOutboundBuffer in); + + /** + * Schedule the write of a singe message. + */ + protected abstract void scheduleWriteSingle(Object msg); + @Override public void close(ChannelPromise promise) { if ((ioState & (WRITE_SCHEDULED | READ_SCHEDULED | CONNECT_SCHEDULED)) == 0) { @@ -414,19 +403,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } - IOUringRecvByteAllocatorHandle newIOUringHandle(RecvByteBufAllocator.ExtendedHandle handle) { + final IOUringRecvByteAllocatorHandle newIOUringHandle(RecvByteBufAllocator.ExtendedHandle handle) { return new IOUringRecvByteAllocatorHandle(handle); } @Override - public IOUringRecvByteAllocatorHandle recvBufAllocHandle() { + public final IOUringRecvByteAllocatorHandle recvBufAllocHandle() { if (allocHandle == null) { allocHandle = newIOUringHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle()); } return allocHandle; } - void shutdownInput(boolean rdHup) { + final void shutdownInput(boolean rdHup) { logger.trace("shutdownInput Fd: {}", fd().intValue()); if (!socket.isInputShutdown()) { if (isAllowHalfClosure(config())) { @@ -456,7 +445,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha close(voidPromise()); } - void schedulePollIn() { + final void schedulePollIn() { assert (ioState & POLL_IN_SCHEDULED) == 0; if (!isActive() || shouldBreakIoUringInReady(config())) { return; @@ -466,7 +455,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha submissionQueue.addPollIn(socket.intValue()); } - void processDelayedClose() { + final void processDelayedClose() { ChannelPromise promise = delayedClose; if (promise != null && (ioState & (READ_SCHEDULED | WRITE_SCHEDULED | CONNECT_SCHEDULED)) == 0) { delayedClose = null; @@ -480,6 +469,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha readComplete0(res); } + /** + * Called once a read was completed. + */ protected abstract void readComplete0(int res); /** @@ -534,6 +526,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } + /** + * A read should be scheduled. + */ protected abstract void scheduleRead0(); /** @@ -574,16 +569,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha schedulePollOut(); } } - } else if (!getSocket().isOutputShutdown()) { + } else if (!socket.isOutputShutdown()) { // Try writing again super.flush0(); } } + /** + * Called once a write was completed. + */ final void writeComplete(int res) { ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); if (res >= 0) { - channelOutboundBuffer.removeBytes(res); + removeFromOutboundBuffer(channelOutboundBuffer, res); // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write // while still removing messages internally in removeBytes(...) which then may corrupt state. ioState &= ~WRITE_SCHEDULED; @@ -601,7 +599,17 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } - final void connectComplete(int res) { + /** + * Called once a write completed and we should remove message(s) from the {@link ChannelOutboundBuffer} + */ + protected void removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int bytes) { + outboundBuffer.removeBytes(bytes); + } + + /** + * Connect was completed. + */ + void connectComplete(int res) { ioState &= ~CONNECT_SCHEDULED; freeRemoteAddressMemory(); @@ -646,19 +654,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha doConnect(remoteAddress, localAddress); InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress; - remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN); + remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE); long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory); - if (socket.isIpv6()) { - SockaddrIn.writeIPv6(remoteAddressMemoryAddress, inetSocketAddress.getAddress(), - inetSocketAddress.getPort()); - } else { - SockaddrIn.writeIPv4(remoteAddressMemoryAddress, inetSocketAddress.getAddress(), - inetSocketAddress.getPort()); - } + SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress); final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue(); - ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN); + ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, + Native.SIZEOF_SOCKADDR_STORAGE); ioState |= CONNECT_SCHEDULED; } catch (Throwable t) { closeIfClosed(); @@ -714,7 +717,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } @Override - protected void doDeregister() { + protected final void doDeregister() { IOUringSubmissionQueue submissionQueue = submissionQueue(); if (submissionQueue != null) { @@ -735,7 +738,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } @Override - public void doBind(final SocketAddress local) throws Exception { + protected void doBind(final SocketAddress local) throws Exception { if (local instanceof InetSocketAddress) { checkResolvable((InetSocketAddress) local); } @@ -749,9 +752,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } - @Override - public abstract DefaultChannelConfig config(); - @Override protected SocketAddress localAddress0() { return local; @@ -762,10 +762,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha return remote; } - protected Socket getSocket() { - return socket; - } - /** * Connect to the remote peer */ diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 26c2d25d36..2f2ee628bf 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -39,12 +39,12 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple protected AbstractIOUringServerChannel(LinuxSocket socket, boolean active) { super(null, socket, active); - acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN); + acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE); acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory); acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES); // Needs to be initialized to the size of acceptedAddressMemory. // See https://man7.org/linux/man-pages/man2/accept.2.html - acceptedAddressLengthMemory.putLong(0, SOCK_ADDR_LEN); + acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE); acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory); } @@ -74,6 +74,16 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { + @Override + protected void scheduleWriteMultiple(ChannelOutboundBuffer in) { + // Do nothing + } + + @Override + protected void scheduleWriteSingle(Object msg) { + // Do nothing + } + @Override protected void scheduleRead0() { final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java index 68112bdeed..13c58aab92 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java @@ -19,10 +19,12 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.socket.DuplexChannel; +import io.netty.channel.unix.IovArray; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -208,6 +210,28 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple private ByteBuf readBuffer; + @Override + protected void scheduleWriteMultiple(ChannelOutboundBuffer in) { + final IovArray iovecArray = ((IOUringEventLoop) eventLoop()).iovArray(); + try { + int offset = iovecArray.count(); + in.forEachFlushedMessage(iovecArray); + submissionQueue().addWritev(socket.intValue(), + iovecArray.memoryAddress(offset), iovecArray.count() - offset); + } catch (Exception e) { + // This should never happen, anyway fallback to single write. + scheduleWriteSingle(in.current()); + } + } + + @Override + protected void scheduleWriteSingle(Object msg) { + ByteBuf buf = (ByteBuf) msg; + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), + buf.writerIndex()); + } + @Override protected void scheduleRead0() { final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannel.java new file mode 100644 index 0000000000..ee8fc40cf2 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannel.java @@ -0,0 +1,550 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultAddressedEnvelope; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.InternetProtocolFamily; +import io.netty.channel.unix.Buffer; +import io.netty.channel.unix.Errors; +import io.netty.channel.unix.Errors.NativeIoException; +import io.netty.channel.unix.Socket; +import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.StringUtil; + +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.PortUnreachableException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import static io.netty.channel.unix.Errors.ioResult; + +public final class IOUringDatagramChannel extends AbstractIOUringChannel implements DatagramChannel { + private static final ChannelMetadata METADATA = new ChannelMetadata(true); + private static final String EXPECTED_TYPES = + " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " + + StringUtil.simpleClassName(AddressedEnvelope.class) + '<' + + StringUtil.simpleClassName(ByteBuf.class) + ", " + + StringUtil.simpleClassName(InetSocketAddress.class) + ">, " + + StringUtil.simpleClassName(ByteBuf.class) + ')'; + + private final IOUringDatagramChannelConfig config; + private volatile boolean connected; + + /** + * Create a new instance which selects the {@link InternetProtocolFamily} to use depending + * on the Operation Systems default which will be chosen. + */ + public IOUringDatagramChannel() { + this(null); + } + + /** + * Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend + * on the Operation Systems default which will be chosen. + */ + public IOUringDatagramChannel(InternetProtocolFamily family) { + this(family == null ? + LinuxSocket.newSocketDgram(Socket.isIPv6Preferred()) : + LinuxSocket.newSocketDgram(family == InternetProtocolFamily.IPv6), false); + } + + /** + * Create a new instance which selects the {@link InternetProtocolFamily} to use depending + * on the Operation Systems default which will be chosen. + */ + public IOUringDatagramChannel(int fd) { + this(new LinuxSocket(fd), true); + } + + private IOUringDatagramChannel(LinuxSocket fd, boolean active) { + super(null, fd, active); + config = new IOUringDatagramChannelConfig(this); + } + + @Override + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); + } + + @Override + public ChannelMetadata metadata() { + return METADATA; + } + + @Override + public boolean isActive() { + return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active); + } + + @Override + public boolean isConnected() { + return connected; + } + + @Override + public ChannelFuture joinGroup(InetAddress multicastAddress) { + return joinGroup(multicastAddress, newPromise()); + } + + @Override + public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) { + try { + return joinGroup( + multicastAddress, + NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise); + } catch (IOException e) { + promise.setFailure(e); + } + return promise; + } + + @Override + public ChannelFuture joinGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface) { + return joinGroup(multicastAddress, networkInterface, newPromise()); + } + + @Override + public ChannelFuture joinGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface, + ChannelPromise promise) { + return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise); + } + + @Override + public ChannelFuture joinGroup( + InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { + return joinGroup(multicastAddress, networkInterface, source, newPromise()); + } + + @Override + public ChannelFuture joinGroup( + final InetAddress multicastAddress, final NetworkInterface networkInterface, + final InetAddress source, final ChannelPromise promise) { + + ObjectUtil.checkNotNull(multicastAddress, "multicastAddress"); + ObjectUtil.checkNotNull(networkInterface, "networkInterface"); + + try { + socket.joinGroup(multicastAddress, networkInterface, source); + promise.setSuccess(); + } catch (IOException e) { + promise.setFailure(e); + } + return promise; + } + + @Override + public ChannelFuture leaveGroup(InetAddress multicastAddress) { + return leaveGroup(multicastAddress, newPromise()); + } + + @Override + public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) { + try { + return leaveGroup( + multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise); + } catch (IOException e) { + promise.setFailure(e); + } + return promise; + } + + @Override + public ChannelFuture leaveGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface) { + return leaveGroup(multicastAddress, networkInterface, newPromise()); + } + + @Override + public ChannelFuture leaveGroup( + InetSocketAddress multicastAddress, + NetworkInterface networkInterface, ChannelPromise promise) { + return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise); + } + + @Override + public ChannelFuture leaveGroup( + InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { + return leaveGroup(multicastAddress, networkInterface, source, newPromise()); + } + + @Override + public ChannelFuture leaveGroup( + final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source, + final ChannelPromise promise) { + ObjectUtil.checkNotNull(multicastAddress, "multicastAddress"); + ObjectUtil.checkNotNull(networkInterface, "networkInterface"); + + try { + socket.leaveGroup(multicastAddress, networkInterface, source); + promise.setSuccess(); + } catch (IOException e) { + promise.setFailure(e); + } + return promise; + } + + @Override + public ChannelFuture block( + InetAddress multicastAddress, NetworkInterface networkInterface, + InetAddress sourceToBlock) { + return block(multicastAddress, networkInterface, sourceToBlock, newPromise()); + } + + @Override + public ChannelFuture block( + final InetAddress multicastAddress, final NetworkInterface networkInterface, + final InetAddress sourceToBlock, final ChannelPromise promise) { + ObjectUtil.checkNotNull(multicastAddress, "multicastAddress"); + ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock"); + ObjectUtil.checkNotNull(networkInterface, "networkInterface"); + + promise.setFailure(new UnsupportedOperationException("Multicast not supported")); + return promise; + } + + @Override + public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) { + return block(multicastAddress, sourceToBlock, newPromise()); + } + + @Override + public ChannelFuture block( + InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) { + try { + return block( + multicastAddress, + NetworkInterface.getByInetAddress(localAddress().getAddress()), + sourceToBlock, promise); + } catch (Throwable e) { + promise.setFailure(e); + } + return promise; + } + + @Override + protected AbstractUringUnsafe newUnsafe() { + return new IOUringDatagramChannelUnsafe(); + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + if (localAddress instanceof InetSocketAddress) { + InetSocketAddress socketAddress = (InetSocketAddress) localAddress; + if (socketAddress.getAddress().isAnyLocalAddress() && + socketAddress.getAddress() instanceof Inet4Address && Socket.isIPv6Preferred()) { + localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort()); + } + } + super.doBind(localAddress); + active = true; + } + + @Override + protected Object filterOutboundMessage(Object msg) { + if (msg instanceof DatagramPacket) { + DatagramPacket packet = (DatagramPacket) msg; + ByteBuf content = packet.content(); + return !content.hasMemoryAddress() ? + new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg; + } + + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf; + } + + if (msg instanceof AddressedEnvelope) { + @SuppressWarnings("unchecked") + AddressedEnvelope e = (AddressedEnvelope) msg; + if (e.content() instanceof ByteBuf && + (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) { + + ByteBuf content = (ByteBuf) e.content(); + return !content.hasMemoryAddress()? + new DefaultAddressedEnvelope( + newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e; + } + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); + } + + @Override + public IOUringDatagramChannelConfig config() { + return config; + } + + @Override + protected void doDisconnect() throws Exception { + // TODO: use io_uring for this too... + socket.disconnect(); + connected = active = false; + + resetCachedAddresses(); + } + + @Override + protected void doClose() throws Exception { + super.doClose(); + ((IOUringDatagramChannelUnsafe) unsafe()).releaseBuffers(); + connected = false; + } + + final class IOUringDatagramChannelUnsafe extends AbstractUringUnsafe { + private ByteBuf readBuffer; + private boolean recvMsg; + + // These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg + // + // TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use + // some sort of other pool. Let's keep it simple for now. + private ByteBuffer recvmsgBuffer; + private long recvmsgBufferAddr = -1; + private ByteBuffer sendmsgBuffer; + private long sendmsgBufferAddr = -1; + + private long sendmsgBufferAddr() { + long address = this.sendmsgBufferAddr; + if (address == -1) { + assert sendmsgBuffer == null; + int length = Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE + Native.SIZEOF_IOVEC; + sendmsgBuffer = Buffer.allocateDirectWithNativeOrder(length); + sendmsgBufferAddr = address = Buffer.memoryAddress(sendmsgBuffer); + + // memset once + PlatformDependent.setMemory(address, length, (byte) 0); + } + return address; + } + + private long recvmsgBufferAddr() { + long address = this.recvmsgBufferAddr; + if (address == -1) { + assert recvmsgBuffer == null; + int length = Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE + Native.SIZEOF_IOVEC; + recvmsgBuffer = Buffer.allocateDirectWithNativeOrder(length); + recvmsgBufferAddr = address = Buffer.memoryAddress(recvmsgBuffer); + + // memset once + PlatformDependent.setMemory(address, length, (byte) 0); + } + return address; + } + + void releaseBuffers() { + if (sendmsgBuffer != null) { + Buffer.free(sendmsgBuffer); + sendmsgBuffer = null; + sendmsgBufferAddr = -1; + } + + if (recvmsgBuffer != null) { + Buffer.free(recvmsgBuffer); + recvmsgBuffer = null; + recvmsgBufferAddr = -1; + } + } + + @Override + protected void readComplete0(int res) { + final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + final ChannelPipeline pipeline = pipeline(); + ByteBuf byteBuf = this.readBuffer; + this.readBuffer = null; + assert byteBuf != null; + boolean recvmsg = this.recvMsg; + this.recvMsg = false; + + try { + if (res < 0) { + // If res is negative we should pass it to ioResult(...) which will either throw + // or convert it to 0 if we could not read because the socket was not readable. + allocHandle.lastBytesRead(ioResult("io_uring read / recvmsg", res)); + } else if (res > 0) { + byteBuf.writerIndex(byteBuf.writerIndex() + res); + allocHandle.lastBytesRead(res); + } else { + allocHandle.lastBytesRead(-1); + } + if (allocHandle.lastBytesRead() <= 0) { + // nothing was read, release the buffer. + byteBuf.release(); + byteBuf = null; + + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + return; + } + DatagramPacket packet; + if (!recvmsg) { + packet = new DatagramPacket(byteBuf, IOUringDatagramChannel.this.localAddress(), + IOUringDatagramChannel.this.remoteAddress()); + } else { + long sockaddrAddress = recvmsgBufferAddr() + Native.SIZEOF_MSGHDR; + final InetSocketAddress remote; + if (socket.isIpv6()) { + byte[] bytes = ((IOUringEventLoop) eventLoop()).inet6AddressArray(); + remote = SockaddrIn.readIPv6(sockaddrAddress, bytes); + } else { + byte[] bytes = ((IOUringEventLoop) eventLoop()).inet4AddressArray(); + remote = SockaddrIn.readIPv4(sockaddrAddress, bytes); + } + packet = new DatagramPacket(byteBuf, + IOUringDatagramChannel.this.localAddress(), remote); + } + allocHandle.incMessagesRead(1); + pipeline.fireChannelRead(packet); + byteBuf = null; + if (allocHandle.continueReading()) { + // Let's schedule another read. + scheduleRead(); + } else { + // We did not fill the whole ByteBuf so we should break the "read loop" and try again later. + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + } + } catch (Throwable t) { + if (connected && t instanceof NativeIoException) { + t = translateForConnected((NativeIoException) t); + } + pipeline.fireExceptionCaught(t); + } finally { + if (byteBuf != null) { + byteBuf.release(); + } + } + } + + @Override + protected void scheduleRead0() { + final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + ByteBuf byteBuf = allocHandle.allocate(alloc()); + IOUringSubmissionQueue submissionQueue = submissionQueue(); + + assert readBuffer == null; + readBuffer = byteBuf; + + recvMsg = !isConnected(); + long bufferAddress = byteBuf.memoryAddress(); + allocHandle.attemptedBytesRead(byteBuf.writableBytes()); + + if (!recvMsg) { + submissionQueue.addRead(socket.intValue(), bufferAddress, + byteBuf.writerIndex(), byteBuf.capacity()); + } else { + int addrLen = addrLen(); + long recvmsgBufferAddr = recvmsgBufferAddr(); + long sockaddrAddress = recvmsgBufferAddr + Native.SIZEOF_MSGHDR; + long iovecAddress = sockaddrAddress + addrLen; + + Iov.write(iovecAddress, bufferAddress + byteBuf.writerIndex(), byteBuf.writableBytes()); + MsgHdr.write(recvmsgBufferAddr, sockaddrAddress, addrLen, iovecAddress, 1); + submissionQueue.addRecvmsg(socket.intValue(), recvmsgBufferAddr); + } + } + + private int addrLen() { + return socket.isIpv6() ? Native.SIZEOF_SOCKADDR_IN6 : + Native.SIZEOF_SOCKADDR_IN; + } + + @Override + protected void removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int bytes) { + // When using Datagram we should consider the message written as long as there were any bytes written. + boolean removed = outboundBuffer.remove(); + assert removed; + } + + @Override + void connectComplete(int res) { + if (res >= 0) { + connected = true; + } + super.connectComplete(res); + } + + @Override + protected void scheduleWriteMultiple(ChannelOutboundBuffer in) { + // We always just use scheduleWriteSingle for now. + scheduleWriteSingle(in.current()); + } + + @Override + protected void scheduleWriteSingle(Object msg) { + final ByteBuf data; + InetSocketAddress remoteAddress; + if (msg instanceof AddressedEnvelope) { + @SuppressWarnings("unchecked") + AddressedEnvelope envelope = + (AddressedEnvelope) msg; + data = envelope.content(); + remoteAddress = envelope.recipient(); + } else { + data = (ByteBuf) msg; + remoteAddress = null; + } + + long bufferAddress = data.memoryAddress(); + IOUringSubmissionQueue submissionQueue = submissionQueue(); + if (remoteAddress == null) { + submissionQueue.addWrite(socket.intValue(), bufferAddress, data.readerIndex(), + data.writerIndex()); + } else { + int addrLen = addrLen(); + long sendmsgBufferAddr = sendmsgBufferAddr(); + long sockaddrAddress = sendmsgBufferAddr + Native.SIZEOF_MSGHDR; + long iovecAddress = sockaddrAddress + Native.SIZEOF_SOCKADDR_STORAGE; + + SockaddrIn.write(socket.isIpv6(), sockaddrAddress, remoteAddress); + Iov.write(iovecAddress, bufferAddress + data.readerIndex(), data.readableBytes()); + MsgHdr.write(sendmsgBufferAddr, sockaddrAddress, addrLen, iovecAddress, 1); + submissionQueue.addSendmsg(socket.intValue(), sendmsgBufferAddr); + } + } + } + + private IOException translateForConnected(NativeIoException e) { + // We need to correctly translate connect errors to match NIO behaviour. + if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) { + PortUnreachableException error = new PortUnreachableException(e.getMessage()); + error.initCause(e); + return error; + } + return e; + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannelConfig.java new file mode 100644 index 0000000000..ca62dd4762 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannelConfig.java @@ -0,0 +1,466 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.socket.DatagramChannelConfig; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.util.Map; + +public final class IOUringDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig { + private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048); + private boolean activeOnOpen; + + IOUringDatagramChannelConfig(AbstractIOUringChannel channel) { + super(channel); + setRecvByteBufAllocator(DEFAULT_RCVBUF_ALLOCATOR); + } + + @Override + @SuppressWarnings("deprecation") + public Map, Object> getOptions() { + return getOptions( + super.getOptions(), + ChannelOption.SO_BROADCAST, ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF, + ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED, + ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL, + ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, + IOUringChannelOption.SO_REUSEPORT, IOUringChannelOption.IP_FREEBIND, + IOUringChannelOption.IP_TRANSPARENT); + } + + @SuppressWarnings({ "unchecked", "deprecation" }) + @Override + public T getOption(ChannelOption option) { + if (option == ChannelOption.SO_BROADCAST) { + return (T) Boolean.valueOf(isBroadcast()); + } + if (option == ChannelOption.SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == ChannelOption.SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == ChannelOption.SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) { + return (T) Boolean.valueOf(isLoopbackModeDisabled()); + } + if (option == ChannelOption.IP_MULTICAST_ADDR) { + return (T) getInterface(); + } + if (option == ChannelOption.IP_MULTICAST_IF) { + return (T) getNetworkInterface(); + } + if (option == ChannelOption.IP_MULTICAST_TTL) { + return (T) Integer.valueOf(getTimeToLive()); + } + if (option == ChannelOption.IP_TOS) { + return (T) Integer.valueOf(getTrafficClass()); + } + if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) { + return (T) Boolean.valueOf(activeOnOpen); + } + if (option == IOUringChannelOption.SO_REUSEPORT) { + return (T) Boolean.valueOf(isReusePort()); + } + if (option == IOUringChannelOption.IP_TRANSPARENT) { + return (T) Boolean.valueOf(isIpTransparent()); + } + if (option == IOUringChannelOption.IP_FREEBIND) { + return (T) Boolean.valueOf(isFreeBind()); + } + return super.getOption(option); + } + + @Override + @SuppressWarnings("deprecation") + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == ChannelOption.SO_BROADCAST) { + setBroadcast((Boolean) value); + } else if (option == ChannelOption.SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == ChannelOption.SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == ChannelOption.SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) { + setLoopbackModeDisabled((Boolean) value); + } else if (option == ChannelOption.IP_MULTICAST_ADDR) { + setInterface((InetAddress) value); + } else if (option == ChannelOption.IP_MULTICAST_IF) { + setNetworkInterface((NetworkInterface) value); + } else if (option == ChannelOption.IP_MULTICAST_TTL) { + setTimeToLive((Integer) value); + } else if (option == ChannelOption.IP_TOS) { + setTrafficClass((Integer) value); + } else if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) { + setActiveOnOpen((Boolean) value); + } else if (option == IOUringChannelOption.SO_REUSEPORT) { + setReusePort((Boolean) value); + } else if (option == IOUringChannelOption.IP_FREEBIND) { + setFreeBind((Boolean) value); + } else if (option == IOUringChannelOption.IP_TRANSPARENT) { + setIpTransparent((Boolean) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + private void setActiveOnOpen(boolean activeOnOpen) { + if (channel.isRegistered()) { + throw new IllegalStateException("Can only changed before channel was registered"); + } + this.activeOnOpen = activeOnOpen; + } + + boolean getActiveOnOpen() { + return activeOnOpen; + } + + @Override + public IOUringDatagramChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } + + @Override + @Deprecated + public IOUringDatagramChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + @Deprecated + public IOUringDatagramChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + public IOUringDatagramChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { + super.setWriteBufferWaterMark(writeBufferWaterMark); + return this; + } + + @Override + public IOUringDatagramChannelConfig setAutoClose(boolean autoClose) { + super.setAutoClose(autoClose); + return this; + } + + @Override + public IOUringDatagramChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + public IOUringDatagramChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public IOUringDatagramChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public IOUringDatagramChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public IOUringDatagramChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + @Deprecated + public IOUringDatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public int getSendBufferSize() { + try { + return ((AbstractIOUringChannel) channel).socket.getSendBufferSize(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringDatagramChannelConfig setSendBufferSize(int sendBufferSize) { + try { + ((AbstractIOUringChannel) channel).socket.setSendBufferSize(sendBufferSize); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getReceiveBufferSize() { + try { + return ((AbstractIOUringChannel) channel).socket.getReceiveBufferSize(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringDatagramChannelConfig setReceiveBufferSize(int receiveBufferSize) { + try { + ((AbstractIOUringChannel) channel).socket.setReceiveBufferSize(receiveBufferSize); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getTrafficClass() { + try { + return ((AbstractIOUringChannel) channel).socket.getTrafficClass(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringDatagramChannelConfig setTrafficClass(int trafficClass) { + try { + ((AbstractIOUringChannel) channel).socket.setTrafficClass(trafficClass); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isReuseAddress() { + try { + return ((AbstractIOUringChannel) channel).socket.isReuseAddress(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringDatagramChannelConfig setReuseAddress(boolean reuseAddress) { + try { + ((AbstractIOUringChannel) channel).socket.setReuseAddress(reuseAddress); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isBroadcast() { + try { + return ((AbstractIOUringChannel) channel).socket.isBroadcast(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringDatagramChannelConfig setBroadcast(boolean broadcast) { + try { + ((AbstractIOUringChannel) channel).socket.setBroadcast(broadcast); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isLoopbackModeDisabled() { + try { + return ((AbstractIOUringChannel) channel).socket.isLoopbackModeDisabled(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringDatagramChannelConfig setLoopbackModeDisabled(boolean loopbackModeDisabled) { + try { + ((AbstractIOUringChannel) channel).socket.setLoopbackModeDisabled(loopbackModeDisabled); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getTimeToLive() { + try { + return ((AbstractIOUringChannel) channel).socket.getTimeToLive(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringDatagramChannelConfig setTimeToLive(int ttl) { + try { + ((AbstractIOUringChannel) channel).socket.setTimeToLive(ttl); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public InetAddress getInterface() { + try { + return ((AbstractIOUringChannel) channel).socket.getInterface(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringDatagramChannelConfig setInterface(InetAddress interfaceAddress) { + try { + ((AbstractIOUringChannel) channel).socket.setInterface(interfaceAddress); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public NetworkInterface getNetworkInterface() { + try { + return ((AbstractIOUringChannel) channel).socket.getNetworkInterface(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) { + try { + ((AbstractIOUringChannel) channel).socket.setNetworkInterface(networkInterface); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Returns {@code true} if the SO_REUSEPORT option is set. + */ + public boolean isReusePort() { + try { + return ((AbstractIOUringChannel) channel).socket.isReusePort(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple + * {@link io.netty.channel.socket.DatagramChannel}s to the same port and so receive datagrams with multiple threads. + * + * Be aware this method needs be called before + * {@link io.netty.channel.socket.DatagramChannel#bind(java.net.SocketAddress)} to have + * any affect. + */ + public IOUringDatagramChannelConfig setReusePort(boolean reusePort) { + try { + ((AbstractIOUringChannel) channel).socket.setReusePort(reusePort); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Returns {@code true} if IP_TRANSPARENT is enabled, + * {@code false} otherwise. + */ + public boolean isIpTransparent() { + try { + return ((AbstractIOUringChannel) channel).socket.isIpTransparent(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * If {@code true} is used IP_TRANSPARENT is enabled, + * {@code false} for disable it. Default is disabled. + */ + public IOUringDatagramChannelConfig setIpTransparent(boolean ipTransparent) { + try { + ((AbstractIOUringChannel) channel).socket.setIpTransparent(ipTransparent); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Returns {@code true} if IP_FREEBIND is enabled, + * {@code false} otherwise. + */ + public boolean isFreeBind() { + try { + return ((AbstractIOUringChannel) channel).socket.isIpFreeBind(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * If {@code true} is used IP_FREEBIND is enabled, + * {@code false} for disable it. Default is disabled. + */ + public IOUringDatagramChannelConfig setFreeBind(boolean freeBind) { + try { + ((AbstractIOUringChannel) channel).socket.setIpFreeBind(freeBind); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 3b3bef10af..c978014cc2 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -233,9 +233,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements if (channel == null) { return; } - if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT) { + if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT || op == Native.IORING_OP_RECVMSG) { handleRead(channel, res); - } else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_WRITE) { + } else if (op == Native.IORING_OP_WRITEV || + op == Native.IORING_OP_WRITE || op == Native.IORING_OP_SENDMSG) { handleWrite(channel, res); } else if (op == Native.IORING_OP_POLL_ADD) { handlePollAdd(channel, res, data); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index ef8973d79c..45c69cf4c4 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -156,7 +156,14 @@ final class IOUringSubmissionQueue { return enqueueSqe(Native.IORING_OP_POLL_ADD, pollMask, fd, 0, 0, 0, pollMask); } - //return true -> submit() was called + boolean addRecvmsg(int fd, long msgHdr) { + return enqueueSqe(Native.IORING_OP_RECVMSG, 0, fd, msgHdr, 1, 0, 0); + } + + boolean addSendmsg(int fd, long msgHdr) { + return enqueueSqe(Native.IORING_OP_SENDMSG, 0, fd, msgHdr, 1, 0, 0); + } + boolean addRead(int fd, long bufferAddress, int pos, int limit) { return enqueueSqe(Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0, 0); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Iov.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Iov.java new file mode 100644 index 0000000000..45f0d5b319 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Iov.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.util.internal.PlatformDependent; + +/** + * struct iovec { + * void *iov_base; // Starting address + * size_t iov_len; // Number of bytes to transfer + * }; + */ +final class Iov { + + private Iov() { } + + static void write(long iovAddress, long bufferAddress, int length) { + if (Native.SIZEOF_SIZE_T == 4) { + PlatformDependent.putInt(iovAddress + Native.IOVEC_OFFSETOF_IOV_BASE, (int) bufferAddress); + PlatformDependent.putInt(iovAddress + Native.IOVEC_OFFSETOF_IOV_LEN, length); + } else { + assert Native.SIZEOF_SIZE_T == 8; + PlatformDependent.putLong(iovAddress + Native.IOVEC_OFFSETOF_IOV_BASE, bufferAddress); + PlatformDependent.putLong(iovAddress + Native.IOVEC_OFFSETOF_IOV_LEN, length); + } + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/MsgHdr.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/MsgHdr.java new file mode 100644 index 0000000000..1f03d18968 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/MsgHdr.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.util.internal.PlatformDependent; + +/** + * struct msghdr { + * void *msg_name; // optional address + * socklen_t msg_namelen; // size of address + * struct iovec*msg_iov; // scatter/gather array + * size_t msg_iovlen; // # elements in msg_iov + * void* msg_control; // ancillary data, see below + * size_t msg_controllen; // ancillary data buffer len + * int msg_flags; // flags on received message + * }; + */ +final class MsgHdr { + + private MsgHdr() { } + + static void write(long memoryAddress, long address, int addressSize, long iovAddress, int iovLength) { + PlatformDependent.putInt(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_NAMELEN, addressSize); + + if (Native.SIZEOF_SIZE_T == 4) { + PlatformDependent.putInt(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_NAME, (int) address); + PlatformDependent.putInt(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_IOV, (int) iovAddress); + PlatformDependent.putInt(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_IOVLEN, iovLength); + } else { + assert Native.SIZEOF_SIZE_T == 8; + PlatformDependent.putLong(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_NAME, address); + PlatformDependent.putLong(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_IOV, iovAddress); + PlatformDependent.putLong(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_IOVLEN, iovLength); + } + + // No msg_control and flags (we assume the memory was memset before) + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index 77f8ba5566..26dab418c7 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -66,6 +66,7 @@ final class Native { static final int SOCK_CLOEXEC = NativeStaticallyReferencedJniMethods.sockCloexec(); static final short AF_INET = (short) NativeStaticallyReferencedJniMethods.afInet(); static final short AF_INET6 = (short) NativeStaticallyReferencedJniMethods.afInet6(); + static final int SIZEOF_SOCKADDR_STORAGE = NativeStaticallyReferencedJniMethods.sizeofSockaddrStorage(); static final int SIZEOF_SOCKADDR_IN = NativeStaticallyReferencedJniMethods.sizeofSockaddrIn(); static final int SIZEOF_SOCKADDR_IN6 = NativeStaticallyReferencedJniMethods.sizeofSockaddrIn6(); static final int SOCKADDR_IN_OFFSETOF_SIN_FAMILY = @@ -73,7 +74,6 @@ final class Native { static final int SOCKADDR_IN_OFFSETOF_SIN_PORT = NativeStaticallyReferencedJniMethods.sockaddrInOffsetofSinPort(); static final int SOCKADDR_IN_OFFSETOF_SIN_ADDR = NativeStaticallyReferencedJniMethods.sockaddrInOffsetofSinAddr(); static final int IN_ADDRESS_OFFSETOF_S_ADDR = NativeStaticallyReferencedJniMethods.inAddressOffsetofSAddr(); - static final int SOCKADDR_IN6_OFFSETOF_SIN6_FAMILY = NativeStaticallyReferencedJniMethods.sockaddrIn6OffsetofSin6Family(); static final int SOCKADDR_IN6_OFFSETOF_SIN6_PORT = @@ -85,7 +85,19 @@ final class Native { static final int SOCKADDR_IN6_OFFSETOF_SIN6_SCOPE_ID = NativeStaticallyReferencedJniMethods.sockaddrIn6OffsetofSin6ScopeId(); static final int IN6_ADDRESS_OFFSETOF_S6_ADDR = NativeStaticallyReferencedJniMethods.in6AddressOffsetofS6Addr(); - + static final int SIZEOF_SIZE_T = NativeStaticallyReferencedJniMethods.sizeofSizeT(); + static final int SIZEOF_IOVEC = NativeStaticallyReferencedJniMethods.sizeofIovec(); + static final int IOVEC_OFFSETOF_IOV_BASE = NativeStaticallyReferencedJniMethods.iovecOffsetofIovBase(); + static final int IOVEC_OFFSETOF_IOV_LEN = NativeStaticallyReferencedJniMethods.iovecOffsetofIovLen(); + static final int SIZEOF_MSGHDR = NativeStaticallyReferencedJniMethods.sizeofMsghdr(); + static final int MSGHDR_OFFSETOF_MSG_NAME = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgName(); + static final int MSGHDR_OFFSETOF_MSG_NAMELEN = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgNamelen(); + static final int MSGHDR_OFFSETOF_MSG_IOV = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgIov(); + static final int MSGHDR_OFFSETOF_MSG_IOVLEN = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgIovlen(); + static final int MSGHDR_OFFSETOF_MSG_CONTROL = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgControl(); + static final int MSGHDR_OFFSETOF_MSG_CONTROLLEN = + NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgControllen(); + static final int MSGHDR_OFFSETOF_MSG_FLAGS = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgFlags(); static final int POLLIN = NativeStaticallyReferencedJniMethods.pollin(); static final int POLLOUT = NativeStaticallyReferencedJniMethods.pollout(); static final int POLLRDHUP = NativeStaticallyReferencedJniMethods.pollrdhup(); @@ -100,6 +112,8 @@ final class Native { static final int IORING_OP_CONNECT = NativeStaticallyReferencedJniMethods.ioringOpConnect(); static final int IORING_OP_CLOSE = NativeStaticallyReferencedJniMethods.ioringOpClose(); static final int IORING_OP_WRITEV = NativeStaticallyReferencedJniMethods.ioringOpWritev(); + static final int IORING_OP_SENDMSG = NativeStaticallyReferencedJniMethods.ioringOpSendmsg(); + static final int IORING_OP_RECVMSG = NativeStaticallyReferencedJniMethods.ioringOpRecvmsg(); static final int IORING_ENTER_GETEVENTS = NativeStaticallyReferencedJniMethods.ioringEnterGetevents(); static final int IOSQE_ASYNC = NativeStaticallyReferencedJniMethods.iosqeAsync(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/NativeStaticallyReferencedJniMethods.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/NativeStaticallyReferencedJniMethods.java index abbced5374..aaae7e2d96 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/NativeStaticallyReferencedJniMethods.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/NativeStaticallyReferencedJniMethods.java @@ -46,6 +46,19 @@ final class NativeStaticallyReferencedJniMethods { static native int sockaddrIn6OffsetofSin6Addr(); static native int sockaddrIn6OffsetofSin6ScopeId(); static native int in6AddressOffsetofS6Addr(); + static native int sizeofSockaddrStorage(); + static native int sizeofSizeT(); + static native int sizeofIovec(); + static native int iovecOffsetofIovBase(); + static native int iovecOffsetofIovLen(); + static native int sizeofMsghdr(); + static native int msghdrOffsetofMsgName(); + static native int msghdrOffsetofMsgNamelen(); + static native int msghdrOffsetofMsgIov(); + static native int msghdrOffsetofMsgIovlen(); + static native int msghdrOffsetofMsgControl(); + static native int msghdrOffsetofMsgControllen(); + static native int msghdrOffsetofMsgFlags(); static native int etime(); static native int ecanceled(); static native int pollin(); @@ -60,6 +73,8 @@ final class NativeStaticallyReferencedJniMethods { static native int ioringOpWrite(); static native int ioringOpConnect(); static native int ioringOpClose(); + static native int ioringOpSendmsg(); + static native int ioringOpRecvmsg(); static native int ioringEnterGetevents(); static native int iosqeAsync(); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/SockaddrIn.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/SockaddrIn.java index 16c110d738..00f8014457 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/SockaddrIn.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/SockaddrIn.java @@ -29,6 +29,14 @@ final class SockaddrIn { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff }; private SockaddrIn() { } + static void write(boolean ipv6, long memory, InetSocketAddress address) { + if (ipv6) { + SockaddrIn.writeIPv6(memory, address.getAddress(), address.getPort()); + } else { + SockaddrIn.writeIPv4(memory, address.getAddress(), address.getPort()); + } + } + /** * * struct sockaddr_in { diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramConnectNotExistsTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramConnectNotExistsTest.java new file mode 100644 index 0000000000..1b485f51e6 --- /dev/null +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramConnectNotExistsTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.DatagramConnectNotExistsTest; + +import java.util.List; + +public class IOUringDatagramConnectNotExistsTest extends DatagramConnectNotExistsTest { + + @Override + protected List> newFactories() { + return IOUringSocketTestPermutation.INSTANCE.datagramSocket(); + } +} diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramMulticastIPv6Test.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramMulticastIPv6Test.java new file mode 100644 index 0000000000..306ffbbd89 --- /dev/null +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramMulticastIPv6Test.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.DatagramMulticastIPv6Test; + +import java.util.List; + +public class IOUringDatagramMulticastIPv6Test extends DatagramMulticastIPv6Test { + + @Override + protected List> newFactories() { + return IOUringSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily()); + } +} diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramMulticastTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramMulticastTest.java new file mode 100644 index 0000000000..dde2378a83 --- /dev/null +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramMulticastTest.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.DatagramMulticastTest; + +import java.util.List; + +public class IOUringDatagramMulticastTest extends DatagramMulticastTest { + @Override + protected List> newFactories() { + return IOUringSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily()); + } +} diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastIPv6MappedTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastIPv6MappedTest.java new file mode 100644 index 0000000000..b577c48ef2 --- /dev/null +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastIPv6MappedTest.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory; +import io.netty.testsuite.transport.socket.DatagramUnicastIPv6MappedTest; + +import java.util.List; + +public class IOUringDatagramUnicastIPv6MappedTest extends DatagramUnicastIPv6MappedTest { + @Override + protected List> newFactories() { + return IOUringSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily()); + } +} diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastIPv6Test.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastIPv6Test.java new file mode 100644 index 0000000000..6c52152a8e --- /dev/null +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastIPv6Test.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.DatagramUnicastIPv6Test; + +import java.util.List; + +public class IOUringDatagramUnicastIPv6Test extends DatagramUnicastIPv6Test { + @Override + protected List> newFactories() { + return IOUringSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily()); + } + +} diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastTest.java new file mode 100644 index 0000000000..445dc05666 --- /dev/null +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringDatagramUnicastTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.socket.InternetProtocolFamily; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.DatagramUnicastTest; + +import java.util.List; + +public class IOUringDatagramUnicastTest extends DatagramUnicastTest { + @Override + protected List> newFactories() { + return IOUringSocketTestPermutation.INSTANCE.datagram(InternetProtocolFamily.IPv4); + } +} diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java index 7dd96e2461..7690cbc70d 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java @@ -17,7 +17,11 @@ package io.netty.channel.uring; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.InternetProtocolFamily; +import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.testsuite.transport.TestsuitePermutation; @@ -110,6 +114,49 @@ public class IOUringSocketTestPermutation extends SocketTestPermutation { ); } + @Override + public List> datagram( + final InternetProtocolFamily family) { + // Make the list of Bootstrap factories. + List> bfs = Arrays.>asList( + new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(IO_URING_WORKER_GROUP) + .channelFactory(new ChannelFactory() { + @Override + public Channel newChannel() { + return new IOUringDatagramChannel(family); + } + + @Override + public String toString() { + return InternetProtocolFamily.class.getSimpleName() + ".class"; + } + }); + } + }, + new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory() { + @Override + public Channel newChannel() { + return new NioDatagramChannel(family); + } + + @Override + public String toString() { + return NioDatagramChannel.class.getSimpleName() + ".class"; + } + }); + } + } + ); + + return combo(bfs, bfs); + } + public boolean isServerFastOpen() { return AccessController.doPrivileged(new PrivilegedAction() { @Override