diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_linuxsocket.c b/transport-native-io_uring/src/main/c/netty_io_uring_linuxsocket.c new file mode 100644 index 0000000000..9c82c2eb18 --- /dev/null +++ b/transport-native-io_uring/src/main/c/netty_io_uring_linuxsocket.c @@ -0,0 +1,793 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/* + * Since glibc 2.8, the _GNU_SOURCE feature test macro must be defined + * (before including any header files) in order to obtain the + * definition of the ucred structure. See + */ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include // TCP_NOTSENT_LOWAT is a linux specific define + +#include "netty_io_uring_linuxsocket.h" +#include "netty_unix_errors.h" +#include "netty_unix_filedescriptor.h" +#include "netty_unix_jni.h" +#include "netty_unix_socket.h" +#include "netty_unix_util.h" + +// TCP_FASTOPEN is defined in linux 3.7. We define this here so older kernels can compile. +#ifndef TCP_FASTOPEN +#define TCP_FASTOPEN 23 +#endif + +// TCP_FASTOPEN_CONNECT is defined in linux 4.11. We define this here so older kernels can compile. +#ifndef TCP_FASTOPEN_CONNECT +#define TCP_FASTOPEN_CONNECT 30 +#endif + +// TCP_NOTSENT_LOWAT is defined in linux 3.12. We define this here so older kernels can compile. +#ifndef TCP_NOTSENT_LOWAT +#define TCP_NOTSENT_LOWAT 25 +#endif + +// SO_BUSY_POLL is defined in linux 3.11. We define this here so older kernels can compile. +#ifndef SO_BUSY_POLL +#define SO_BUSY_POLL 46 +#endif + +static jclass peerCredentialsClass = NULL; +static jmethodID peerCredentialsMethodId = NULL; + +static jfieldID fileChannelFieldId = NULL; +static jfieldID transferredFieldId = NULL; +static jfieldID fdFieldId = NULL; +static jfieldID fileDescriptorFieldId = NULL; + +// JNI Registered Methods Begin +static void netty_io_uring_linuxsocket_setTimeToLive(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setIpMulticastLoop(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jint optval) { + if (ipv6 == JNI_TRUE) { + u_int val = (u_int) optval; + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &val, sizeof(val)); + } else { + u_char val = (u_char) optval; + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_MULTICAST_LOOP, &val, sizeof(val)); + } +} + +static void netty_io_uring_linuxsocket_setInterface(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) { + struct sockaddr_storage interfaceAddr; + socklen_t interfaceAddrSize; + struct sockaddr_in* interfaceIpAddr; + + memset(&interfaceAddr, 0, sizeof(interfaceAddr)); + + if (ipv6 == JNI_TRUE) { + if (interfaceIndex == -1) { + netty_unix_errors_throwIOException(env, "Unable to find network index"); + return; + } + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &interfaceIndex, sizeof(interfaceIndex)); + } else { + if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr"); + return; + } + + interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr; + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_MULTICAST_IF, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr)); + } +} + +static void netty_io_uring_linuxsocket_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setTcpQuickAck(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_QUICKACK, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setTcpDeferAccept(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setTcpFastOpen(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_FASTOPEN, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setTcpFastOpenConnect(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setTcpKeepIntvl(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_KEEPINTVL, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setTcpKeepCnt(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setTcpUserTimeout(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setIpFreeBind(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_FREEBIND, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setIpTransparent(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, SOL_IP, IP_TRANSPARENT, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setIpRecvOrigDestAddr(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_RECVORIGDSTADDR, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_setSoBusyPoll(JNIEnv* env, jclass clazz, jint fd, jint optval) { + netty_unix_socket_setOption(env, fd, SOL_SOCKET, SO_BUSY_POLL, &optval, sizeof(optval)); +} + +static void netty_io_uring_linuxsocket_joinGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) { + struct sockaddr_storage groupAddr; + socklen_t groupAddrSize; + struct sockaddr_storage interfaceAddr; + socklen_t interfaceAddrSize; + struct sockaddr_in* groupIpAddr; + struct sockaddr_in* interfaceIpAddr; + struct ip_mreq mreq; + + struct sockaddr_in6* groupIp6Addr; + struct ipv6_mreq mreq6; + + memset(&groupAddr, 0, sizeof(groupAddr)); + memset(&interfaceAddr, 0, sizeof(interfaceAddr)); + + if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress"); + return; + } + + switch (groupAddr.ss_family) { + case AF_INET: + if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddr"); + return; + } + + interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr; + groupIpAddr = (struct sockaddr_in*) &groupAddr; + + memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr)); + memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + break; + case AF_INET6: + if (interfaceIndex == -1) { + netty_unix_errors_throwIOException(env, "Unable to find network index"); + return; + } + mreq6.ipv6mr_interface = interfaceIndex; + + groupIp6Addr = (struct sockaddr_in6*) &groupAddr; + memcpy(&mreq6.ipv6mr_multiaddr, &groupIp6Addr->sin6_addr, sizeof(groupIp6Addr->sin6_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq6, sizeof(mreq6)); + break; + default: + netty_unix_errors_throwIOException(env, "Address family not supported"); + break; + } +} + +static void netty_io_uring_linuxsocket_joinSsmGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex, jbyteArray sourceAddress) { + struct sockaddr_storage groupAddr; + socklen_t groupAddrSize; + struct sockaddr_storage interfaceAddr; + socklen_t interfaceAddrSize; + struct sockaddr_storage sourceAddr; + socklen_t sourceAddrSize; + struct sockaddr_in* groupIpAddr; + struct sockaddr_in* interfaceIpAddr; + struct sockaddr_in* sourceIpAddr; + struct ip_mreq_source mreq; + + struct group_source_req mreq6; + + memset(&groupAddr, 0, sizeof(groupAddr)); + memset(&sourceAddr, 0, sizeof(sourceAddr)); + memset(&interfaceAddr, 0, sizeof(interfaceAddr)); + + if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress"); + return; + } + + if (netty_unix_socket_initSockaddr(env, ipv6, sourceAddress, scopeId, 0, &sourceAddr, &sourceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for sourceAddress"); + return; + } + + switch (groupAddr.ss_family) { + case AF_INET: + if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddress"); + return; + } + interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr; + groupIpAddr = (struct sockaddr_in*) &groupAddr; + sourceIpAddr = (struct sockaddr_in*) &sourceAddr; + memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr)); + memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr)); + memcpy(&mreq.imr_sourceaddr, &sourceIpAddr->sin_addr, sizeof(sourceIpAddr->sin_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq)); + break; + case AF_INET6: + if (interfaceIndex == -1) { + netty_unix_errors_throwIOException(env, "Unable to find network index"); + return; + } + mreq6.gsr_group = groupAddr; + mreq6.gsr_interface = interfaceIndex; + mreq6.gsr_source = sourceAddr; + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, MCAST_JOIN_SOURCE_GROUP, &mreq6, sizeof(mreq6)); + break; + default: + netty_unix_errors_throwIOException(env, "Address family not supported"); + break; + } +} + +static void netty_io_uring_linuxsocket_leaveGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex) { + struct sockaddr_storage groupAddr; + socklen_t groupAddrSize; + + struct sockaddr_storage interfaceAddr; + socklen_t interfaceAddrSize; + struct sockaddr_in* groupIpAddr; + struct sockaddr_in* interfaceIpAddr; + struct ip_mreq mreq; + + struct sockaddr_in6* groupIp6Addr; + struct ipv6_mreq mreq6; + + memset(&groupAddr, 0, sizeof(groupAddr)); + memset(&interfaceAddr, 0, sizeof(interfaceAddr)); + + if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress"); + return; + } + + switch (groupAddr.ss_family) { + case AF_INET: + if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddress"); + return; + } + interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr; + groupIpAddr = (struct sockaddr_in*) &groupAddr; + + memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr)); + memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq)); + break; + case AF_INET6: + if (interfaceIndex == -1) { + netty_unix_errors_throwIOException(env, "Unable to find network index"); + return; + } + mreq6.ipv6mr_interface = interfaceIndex; + + groupIp6Addr = (struct sockaddr_in6*) &groupAddr; + memcpy(&mreq6.ipv6mr_multiaddr, &groupIp6Addr->sin6_addr, sizeof(groupIp6Addr->sin6_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_LEAVE_GROUP, &mreq6, sizeof(mreq6)); + break; + default: + netty_unix_errors_throwIOException(env, "Address family not supported"); + break; + } +} + +static void netty_io_uring_linuxsocket_leaveSsmGroup(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jint interfaceIndex, jbyteArray sourceAddress) { + struct sockaddr_storage groupAddr; + socklen_t groupAddrSize; + struct sockaddr_storage interfaceAddr; + socklen_t interfaceAddrSize; + struct sockaddr_storage sourceAddr; + socklen_t sourceAddrSize; + struct sockaddr_in* groupIpAddr; + struct sockaddr_in* interfaceIpAddr; + struct sockaddr_in* sourceIpAddr; + + struct ip_mreq_source mreq; + struct group_source_req mreq6; + + memset(&groupAddr, 0, sizeof(groupAddr)); + memset(&sourceAddr, 0, sizeof(sourceAddr)); + memset(&interfaceAddr, 0, sizeof(interfaceAddr)); + + + if (netty_unix_socket_initSockaddr(env, ipv6, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for groupAddress"); + return; + } + + if (netty_unix_socket_initSockaddr(env, ipv6, sourceAddress, scopeId, 0, &sourceAddr, &sourceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for sourceAddress"); + return; + } + + switch (groupAddr.ss_family) { + case AF_INET: + if (netty_unix_socket_initSockaddr(env, ipv6, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr for interfaceAddress"); + return; + } + interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr; + + groupIpAddr = (struct sockaddr_in*) &groupAddr; + sourceIpAddr = (struct sockaddr_in*) &sourceAddr; + memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr)); + memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr)); + memcpy(&mreq.imr_sourceaddr, &sourceIpAddr->sin_addr, sizeof(sourceIpAddr->sin_addr)); + netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_DROP_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq)); + break; + case AF_INET6: + if (interfaceIndex == -1) { + netty_unix_errors_throwIOException(env, "Unable to find network index"); + return; + } + + mreq6.gsr_group = groupAddr; + mreq6.gsr_interface = interfaceIndex; + mreq6.gsr_source = sourceAddr; + netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, MCAST_LEAVE_SOURCE_GROUP, &mreq6, sizeof(mreq6)); + break; + default: + netty_unix_errors_throwIOException(env, "Address family not supported"); + break; + } +} + +static void netty_io_uring_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jbyteArray address, jint scopeId, jbyteArray key) { + struct sockaddr_storage addr; + socklen_t addrSize; + + memset(&addr, 0, sizeof(addr)); + + if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, 0, &addr, &addrSize) == -1) { + netty_unix_errors_throwIOException(env, "Could not init sockaddr"); + return; + } + + struct tcp_md5sig md5sig; + memset(&md5sig, 0, sizeof(md5sig)); + md5sig.tcpm_addr.ss_family = addr.ss_family; + + struct sockaddr_in* ipaddr; + struct sockaddr_in6* ip6addr; + + switch (addr.ss_family) { + case AF_INET: + ipaddr = (struct sockaddr_in*) &addr; + memcpy(&((struct sockaddr_in *) &md5sig.tcpm_addr)->sin_addr, &ipaddr->sin_addr, sizeof(ipaddr->sin_addr)); + break; + case AF_INET6: + ip6addr = (struct sockaddr_in6*) &addr; + memcpy(&((struct sockaddr_in6 *) &md5sig.tcpm_addr)->sin6_addr, &ip6addr->sin6_addr, sizeof(ip6addr->sin6_addr)); + break; + } + + if (key != NULL) { + md5sig.tcpm_keylen = (*env)->GetArrayLength(env, key); + (*env)->GetByteArrayRegion(env, key, 0, md5sig.tcpm_keylen, (void *) &md5sig.tcpm_key); + if ((*env)->ExceptionCheck(env) == JNI_TRUE) { + return; + } + } + + if (setsockopt(fd, IPPROTO_TCP, TCP_MD5SIG, &md5sig, sizeof(md5sig)) < 0) { + netty_unix_errors_throwIOExceptionErrorNo(env, "setsockopt() failed: ", errno); + } +} + +static int netty_io_uring_linuxsocket_getInterface(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6) { + if (ipv6 == JNI_TRUE) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; + } else { + struct in_addr optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_MULTICAST_IF, &optval, sizeof(optval)) == -1) { + return -1; + } + + return ntohl(optval.s_addr); + } +} + +static jint netty_io_uring_linuxsocket_getTimeToLive(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + + +static jint netty_io_uring_linuxsocket_getIpMulticastLoop(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6) { + if (ipv6 == JNI_TRUE) { + u_int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &optval, sizeof(optval)) == -1) { + return -1; + } + return (jint) optval; + } else { + u_char optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_MULTICAST_LOOP, &optval, sizeof(optval)) == -1) { + return -1; + } + return (jint) optval; + } +} + +static jint netty_io_uring_linuxsocket_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_getTcpKeepIntvl(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_KEEPINTVL, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_getTcpKeepCnt(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_KEEPCNT, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_getTcpUserTimeout(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_isIpFreeBind(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_FREEBIND, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_isIpTransparent(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, SOL_IP, IP_TRANSPARENT, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_isIpRecvOrigDestAddr(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_RECVORIGDSTADDR, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static void netty_io_uring_linuxsocket_getTcpInfo(JNIEnv* env, jclass clazz, jint fd, jlongArray array) { + struct tcp_info tcp_info; + if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_INFO, &tcp_info, sizeof(tcp_info)) == -1) { + return; + } + jlong cArray[32]; + // Expand to 64 bits, then cast away unsigned-ness. + cArray[0] = (jlong) (uint64_t) tcp_info.tcpi_state; + cArray[1] = (jlong) (uint64_t) tcp_info.tcpi_ca_state; + cArray[2] = (jlong) (uint64_t) tcp_info.tcpi_retransmits; + cArray[3] = (jlong) (uint64_t) tcp_info.tcpi_probes; + cArray[4] = (jlong) (uint64_t) tcp_info.tcpi_backoff; + cArray[5] = (jlong) (uint64_t) tcp_info.tcpi_options; + cArray[6] = (jlong) (uint64_t) tcp_info.tcpi_snd_wscale; + cArray[7] = (jlong) (uint64_t) tcp_info.tcpi_rcv_wscale; + cArray[8] = (jlong) (uint64_t) tcp_info.tcpi_rto; + cArray[9] = (jlong) (uint64_t) tcp_info.tcpi_ato; + cArray[10] = (jlong) (uint64_t) tcp_info.tcpi_snd_mss; + cArray[11] = (jlong) (uint64_t) tcp_info.tcpi_rcv_mss; + cArray[12] = (jlong) (uint64_t) tcp_info.tcpi_unacked; + cArray[13] = (jlong) (uint64_t) tcp_info.tcpi_sacked; + cArray[14] = (jlong) (uint64_t) tcp_info.tcpi_lost; + cArray[15] = (jlong) (uint64_t) tcp_info.tcpi_retrans; + cArray[16] = (jlong) (uint64_t) tcp_info.tcpi_fackets; + cArray[17] = (jlong) (uint64_t) tcp_info.tcpi_last_data_sent; + cArray[18] = (jlong) (uint64_t) tcp_info.tcpi_last_ack_sent; + cArray[19] = (jlong) (uint64_t) tcp_info.tcpi_last_data_recv; + cArray[20] = (jlong) (uint64_t) tcp_info.tcpi_last_ack_recv; + cArray[21] = (jlong) (uint64_t) tcp_info.tcpi_pmtu; + cArray[22] = (jlong) (uint64_t) tcp_info.tcpi_rcv_ssthresh; + cArray[23] = (jlong) (uint64_t) tcp_info.tcpi_rtt; + cArray[24] = (jlong) (uint64_t) tcp_info.tcpi_rttvar; + cArray[25] = (jlong) (uint64_t) tcp_info.tcpi_snd_ssthresh; + cArray[26] = (jlong) (uint64_t) tcp_info.tcpi_snd_cwnd; + cArray[27] = (jlong) (uint64_t) tcp_info.tcpi_advmss; + cArray[28] = (jlong) (uint64_t) tcp_info.tcpi_reordering; + cArray[29] = (jlong) (uint64_t) tcp_info.tcpi_rcv_rtt; + cArray[30] = (jlong) (uint64_t) tcp_info.tcpi_rcv_space; + cArray[31] = (jlong) (uint64_t) tcp_info.tcpi_total_retrans; + + (*env)->SetLongArrayRegion(env, array, 0, 32, cArray); +} + +static jint netty_io_uring_linuxsocket_isTcpCork(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_getSoBusyPoll(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, SOL_SOCKET, SO_BUSY_POLL, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_getTcpDeferAccept(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_isTcpQuickAck(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_QUICKACK, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_isTcpFastOpenConnect(JNIEnv* env, jclass clazz, jint fd) { + int optval; + // We call netty_unix_socket_getOption0 directly so we can handle ENOPROTOOPT by ourself. + if (netty_unix_socket_getOption0(fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &optval, sizeof(optval)) == -1) { + if (errno == ENOPROTOOPT) { + // Not supported by the system, so just return 0. + return 0; + } + netty_unix_socket_getOptionHandleError(env, errno); + return -1; + } + return optval; +} + +static jint netty_io_uring_linuxsocket_getTcpNotSentLowAt(JNIEnv* env, jclass clazz, jint fd) { + int optval; + if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_NOTSENT_LOWAT, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + +static jobject netty_io_uring_linuxsocket_getPeerCredentials(JNIEnv *env, jclass clazz, jint fd) { + struct ucred credentials; + if(netty_unix_socket_getOption(env,fd, SOL_SOCKET, SO_PEERCRED, &credentials, sizeof (credentials)) == -1) { + return NULL; + } + jintArray gids = (*env)->NewIntArray(env, 1); + (*env)->SetIntArrayRegion(env, gids, 0, 1, (jint*) &credentials.gid); + return (*env)->NewObject(env, peerCredentialsClass, peerCredentialsMethodId, credentials.pid, credentials.uid, gids); +} + +static jlong netty_io_uring_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd, jobject fileRegion, jlong base_off, jlong off, jlong len) { + jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId); + if (fileChannel == NULL) { + netty_unix_errors_throwRuntimeException(env, "failed to get DefaultFileRegion.file"); + return -1; + } + jobject fileDescriptor = (*env)->GetObjectField(env, fileChannel, fileDescriptorFieldId); + if (fileDescriptor == NULL) { + netty_unix_errors_throwRuntimeException(env, "failed to get FileChannelImpl.fd"); + return -1; + } + jint srcFd = (*env)->GetIntField(env, fileDescriptor, fdFieldId); + if (srcFd == -1) { + netty_unix_errors_throwRuntimeException(env, "failed to get FileDescriptor.fd"); + return -1; + } + ssize_t res; + off_t offset = base_off + off; + int err; + do { + res = sendfile(fd, srcFd, &offset, (size_t) len); + } while (res == -1 && ((err = errno) == EINTR)); + if (res < 0) { + return -err; + } + if (res > 0) { + // update the transferred field in DefaultFileRegion + (*env)->SetLongField(env, fileRegion, transferredFieldId, off + res); + } + + return res; +} +// JNI Registered Methods End + +// JNI Method Registration Table Begin +static const JNINativeMethod fixed_method_table[] = { + { "setTimeToLive", "(II)V", (void *) netty_io_uring_linuxsocket_setTimeToLive }, + { "getTimeToLive", "(I)I", (void *) netty_io_uring_linuxsocket_getTimeToLive }, + { "setInterface", "(IZ[BII)V", (void *) netty_io_uring_linuxsocket_setInterface }, + { "getInterface", "(IZ)I", (void *) netty_io_uring_linuxsocket_getInterface }, + { "setIpMulticastLoop", "(IZI)V", (void * ) netty_io_uring_linuxsocket_setIpMulticastLoop }, + { "getIpMulticastLoop", "(IZ)I", (void * ) netty_io_uring_linuxsocket_getIpMulticastLoop }, + { "setTcpCork", "(II)V", (void *) netty_io_uring_linuxsocket_setTcpCork }, + { "setSoBusyPoll", "(II)V", (void *) netty_io_uring_linuxsocket_setSoBusyPoll }, + { "setTcpQuickAck", "(II)V", (void *) netty_io_uring_linuxsocket_setTcpQuickAck }, + { "setTcpDeferAccept", "(II)V", (void *) netty_io_uring_linuxsocket_setTcpDeferAccept }, + { "setTcpNotSentLowAt", "(II)V", (void *) netty_io_uring_linuxsocket_setTcpNotSentLowAt }, + { "isTcpCork", "(I)I", (void *) netty_io_uring_linuxsocket_isTcpCork }, + { "getSoBusyPoll", "(I)I", (void *) netty_io_uring_linuxsocket_getSoBusyPoll }, + { "getTcpDeferAccept", "(I)I", (void *) netty_io_uring_linuxsocket_getTcpDeferAccept }, + { "getTcpNotSentLowAt", "(I)I", (void *) netty_io_uring_linuxsocket_getTcpNotSentLowAt }, + { "isTcpQuickAck", "(I)I", (void *) netty_io_uring_linuxsocket_isTcpQuickAck }, + { "setTcpFastOpen", "(II)V", (void *) netty_io_uring_linuxsocket_setTcpFastOpen }, + { "setTcpFastOpenConnect", "(II)V", (void *) netty_io_uring_linuxsocket_setTcpFastOpenConnect }, + { "isTcpFastOpenConnect", "(I)I", (void *) netty_io_uring_linuxsocket_isTcpFastOpenConnect }, + { "setTcpKeepIdle", "(II)V", (void *) netty_io_uring_linuxsocket_setTcpKeepIdle }, + { "setTcpKeepIntvl", "(II)V", (void *) netty_io_uring_linuxsocket_setTcpKeepIntvl }, + { "setTcpKeepCnt", "(II)V", (void *) netty_io_uring_linuxsocket_setTcpKeepCnt }, + { "setTcpUserTimeout", "(II)V", (void *) netty_io_uring_linuxsocket_setTcpUserTimeout }, + { "setIpFreeBind", "(II)V", (void *) netty_io_uring_linuxsocket_setIpFreeBind }, + { "setIpTransparent", "(II)V", (void *) netty_io_uring_linuxsocket_setIpTransparent }, + { "setIpRecvOrigDestAddr", "(II)V", (void *) netty_io_uring_linuxsocket_setIpRecvOrigDestAddr }, + { "getTcpKeepIdle", "(I)I", (void *) netty_io_uring_linuxsocket_getTcpKeepIdle }, + { "getTcpKeepIntvl", "(I)I", (void *) netty_io_uring_linuxsocket_getTcpKeepIntvl }, + { "getTcpKeepCnt", "(I)I", (void *) netty_io_uring_linuxsocket_getTcpKeepCnt }, + { "getTcpUserTimeout", "(I)I", (void *) netty_io_uring_linuxsocket_getTcpUserTimeout }, + { "isIpFreeBind", "(I)I", (void *) netty_io_uring_linuxsocket_isIpFreeBind }, + { "isIpTransparent", "(I)I", (void *) netty_io_uring_linuxsocket_isIpTransparent }, + { "isIpRecvOrigDestAddr", "(I)I", (void *) netty_io_uring_linuxsocket_isIpRecvOrigDestAddr }, + { "getTcpInfo", "(I[J)V", (void *) netty_io_uring_linuxsocket_getTcpInfo }, + { "setTcpMd5Sig", "(IZ[BI[B)V", (void *) netty_io_uring_linuxsocket_setTcpMd5Sig }, + { "joinGroup", "(IZ[B[BII)V", (void *) netty_io_uring_linuxsocket_joinGroup }, + { "joinSsmGroup", "(IZ[B[BII[B)V", (void *) netty_io_uring_linuxsocket_joinSsmGroup }, + { "leaveGroup", "(IZ[B[BII)V", (void *) netty_io_uring_linuxsocket_leaveGroup }, + { "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_io_uring_linuxsocket_leaveSsmGroup } + // "sendFile" has a dynamic signature +}; + +static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); + +static jint dynamicMethodsTableSize() { + return fixed_method_table_size + 2; // 2 is for the dynamic method signatures. +} + +static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) { + char* dynamicTypeName = NULL; + size_t size = sizeof(JNINativeMethod) * dynamicMethodsTableSize(); + JNINativeMethod* dynamicMethods = malloc(size); + if (dynamicMethods == NULL) { + return NULL; + } + memset(dynamicMethods, 0, size); + memcpy(dynamicMethods, fixed_method_table, sizeof(fixed_method_table)); + + JNINativeMethod* dynamicMethod = &dynamicMethods[fixed_method_table_size]; + NETTY_PREPEND(packagePrefix, "io/netty/channel/unix/PeerCredentials;", dynamicTypeName, error); + NETTY_PREPEND("(I)L", dynamicTypeName, dynamicMethod->signature, error); + dynamicMethod->name = "getPeerCredentials"; + dynamicMethod->fnPtr = (void *) netty_io_uring_linuxsocket_getPeerCredentials; + netty_unix_util_free_dynamic_name(&dynamicTypeName); + + ++dynamicMethod; + NETTY_PREPEND(packagePrefix, "io/netty/channel/DefaultFileRegion;JJJ)J", dynamicTypeName, error); + NETTY_PREPEND("(IL", dynamicTypeName, dynamicMethod->signature, error); + dynamicMethod->name = "sendFile"; + dynamicMethod->fnPtr = (void *) netty_io_uring_linuxsocket_sendFile; + netty_unix_util_free_dynamic_name(&dynamicTypeName); + return dynamicMethods; +error: + free(dynamicTypeName); + netty_unix_util_free_dynamic_methods_table(dynamicMethods, fixed_method_table_size, dynamicMethodsTableSize()); + return NULL; +} + +// JNI Method Registration Table End + +jint netty_io_uring_linuxsocket_JNI_OnLoad(JNIEnv* env, const char* packagePrefix) { + int ret = JNI_ERR; + char* nettyClassName = NULL; + jclass fileRegionCls = NULL; + jclass fileChannelCls = NULL; + jclass fileDescriptorCls = NULL; + // Register the methods which are not referenced by static member variables + JNINativeMethod* dynamicMethods = createDynamicMethodsTable(packagePrefix); + if (dynamicMethods == NULL) { + goto done; + } + if (netty_unix_util_register_natives(env, + packagePrefix, + "io/netty/channel/uring/LinuxSocket", + dynamicMethods, + dynamicMethodsTableSize()) != 0) { + goto done; + } + + NETTY_PREPEND(packagePrefix, "io/netty/channel/unix/PeerCredentials", nettyClassName, done); + NETTY_LOAD_CLASS(env, peerCredentialsClass, nettyClassName, done); + netty_unix_util_free_dynamic_name(&nettyClassName); + + NETTY_GET_METHOD(env, peerCredentialsClass, peerCredentialsMethodId, "", "(II[I)V", done); + + NETTY_PREPEND(packagePrefix, "io/netty/channel/DefaultFileRegion", nettyClassName, done); + NETTY_FIND_CLASS(env, fileRegionCls, nettyClassName, done); + netty_unix_util_free_dynamic_name(&nettyClassName); + + NETTY_GET_FIELD(env, fileRegionCls, fileChannelFieldId, "file", "Ljava/nio/channels/FileChannel;", done); + NETTY_GET_FIELD(env, fileRegionCls, transferredFieldId, "transferred", "J", done); + + NETTY_FIND_CLASS(env, fileChannelCls, "sun/nio/ch/FileChannelImpl", done); + NETTY_GET_FIELD(env, fileChannelCls, fileDescriptorFieldId, "fd", "Ljava/io/FileDescriptor;", done); + + NETTY_FIND_CLASS(env, fileDescriptorCls, "java/io/FileDescriptor", done); + NETTY_GET_FIELD(env, fileDescriptorCls, fdFieldId, "fd", "I", done); + + ret = NETTY_JNI_VERSION; +done: + netty_unix_util_free_dynamic_methods_table(dynamicMethods, fixed_method_table_size, dynamicMethodsTableSize()); + free(nettyClassName); + + return ret; +} + +void netty_io_uring_linuxsocket_JNI_OnUnLoad(JNIEnv* env) { + NETTY_UNLOAD_CLASS(env, peerCredentialsClass); +} diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_linuxsocket.h b/transport-native-io_uring/src/main/c/netty_io_uring_linuxsocket.h new file mode 100644 index 0000000000..b117500fd2 --- /dev/null +++ b/transport-native-io_uring/src/main/c/netty_io_uring_linuxsocket.h @@ -0,0 +1,26 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +#ifndef NETTY_IO_URING_LINUXSOCKET_H_ +#define NETTY_IO_URING_LINUXSOCKET_H_ + +#include + +// JNI initialization hooks. Users of this file are responsible for calling these in the JNI_OnLoad and JNI_OnUnload methods. +jint netty_io_uring_linuxsocket_JNI_OnLoad(JNIEnv* env, const char* packagePrefix); +void netty_io_uring_linuxsocket_JNI_OnUnLoad(JNIEnv* env); + +#endif diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_native.c b/transport-native-io_uring/src/main/c/netty_io_uring_native.c index 3c1ab0149c..d8093d2649 100644 --- a/transport-native-io_uring/src/main/c/netty_io_uring_native.c +++ b/transport-native-io_uring/src/main/c/netty_io_uring_native.c @@ -32,6 +32,7 @@ #include #include "syscall.h" +#include "netty_io_uring_linuxsocket.h" #include #include #include @@ -228,20 +229,23 @@ static void netty_epoll_native_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, static void netty_io_uring_ring_buffer_exit(JNIEnv *env, jclass class, jobject ringBuffer) { // Find the id of the Java method to be called + jclass ringBufferClass = (*env)->GetObjectClass(env, ringBuffer); jmethodID submissionQueueMethodId = (*env)->GetMethodID(env, ringBufferClass, "getIoUringSubmissionQueue", "()Lio/netty/channel/uring/IOUringSubmissionQueue;"); jmethodID completionQueueMethodId = (*env)->GetMethodID(env, ringBufferClass, "getIoUringCompletionQueue", "()Lio/netty/channel/uring/IOUringCompletionQueue;"); jobject submissionQueue = (*env)->CallObjectMethod(env, ringBuffer, submissionQueueMethodId); jobject completionQueue = (*env)->CallObjectMethod(env, ringBuffer, completionQueueMethodId); + jclass submissionQueueClass = (*env)->GetObjectClass(env, submissionQueue); + jclass completionQueueClass = (*env)->GetObjectClass(env, completionQueue); - jmethodID submissionQueueArrayAddressMethodId = (*env)->GetMethodID(env, ioUringSubmissionQueueClass, "getSubmissionQueueArrayAddress", "()J"); - jmethodID submissionQueueKringEntriesAddressMethodId = (*env)->GetMethodID(env, ioUringSubmissionQueueClass, "getKRingEntriesAddress", "()J"); - jmethodID submissionQueueRingFdMethodId = (*env)->GetMethodID(env, ioUringSubmissionQueueClass, "getRingFd", "()I"); - jmethodID submissionQueueRingAddressMethodId = (*env)->GetMethodID(env, ioUringSubmissionQueueClass, "getRingAddress", "()J"); - jmethodID submissionQueueRingSizeMethodId = (*env)->GetMethodID(env, ioUringSubmissionQueueClass, "getRingSize", "()I"); + jmethodID submissionQueueArrayAddressMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getSubmissionQueueArrayAddress", "()J"); + jmethodID submissionQueueKringEntriesAddressMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getKRingEntriesAddress", "()J"); + jmethodID submissionQueueRingFdMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingFd", "()I"); + jmethodID submissionQueueRingAddressMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingAddress", "()J"); + jmethodID submissionQueueRingSizeMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingSize", "()I"); - jmethodID completionQueueRingAddressMethodId = (*env)->GetMethodID(env, ioUringCompletionQueueClass, "getRingAddress", "()J"); - jmethodID completionQueueRingSizeMethodId = (*env)->GetMethodID(env, ioUringCompletionQueueClass, "getRingSize", "()I"); + jmethodID completionQueueRingAddressMethodId = (*env)->GetMethodID(env, completionQueueClass, "getRingAddress", "()J"); + jmethodID completionQueueRingSizeMethodId = (*env)->GetMethodID(env, completionQueueClass, "getRingSize", "()I"); jlong submissionQueueArrayAddress = (*env)->CallLongMethod(env, submissionQueue, submissionQueueArrayAddressMethodId); jlong submissionQueueKringEntriesAddress = (*env)->CallLongMethod(env, submissionQueue, submissionQueueKringEntriesAddressMethodId); @@ -273,7 +277,8 @@ static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries) { int ring_fd = sys_io_uring_setup((int)entries, &p); //Todo - if (ring_fd < -1) { + if (ring_fd < 0) { + printf("RingFd error: %d\n", ring_fd); //throw Exception return NULL; } @@ -335,6 +340,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) { int filedescriptorOnLoadCalled = 0; int socketOnLoadCalled = 0; int bufferOnLoadCalled = 0; + int linuxsocketOnLoadCalled = 0; JNIEnv *env; char *nettyClassName = NULL; @@ -393,6 +399,11 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) { } bufferOnLoadCalled = 1; + if (netty_io_uring_linuxsocket_JNI_OnLoad(env, packagePrefix) == JNI_ERR) { + goto done; + } + linuxsocketOnLoadCalled = 1; + NETTY_PREPEND(packagePrefix, "io/netty/channel/uring/RingBuffer", nettyClassName, done); NETTY_LOAD_CLASS(env, ringBufferClass, nettyClassName, done); @@ -435,6 +446,9 @@ done: if (bufferOnLoadCalled == 1) { netty_unix_buffer_JNI_OnUnLoad(env); } + if (linuxsocketOnLoadCalled == 1) { + netty_io_uring_linuxsocket_JNI_OnUnLoad(env); + } ringBufferMethodId = NULL; ioUringSubmissionQueueMethodId = NULL; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 502bbb5575..f748ca14b5 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -41,14 +41,14 @@ import static io.netty.util.internal.ObjectUtil.*; abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); - final Socket socket; + final LinuxSocket socket; protected volatile boolean active; boolean uringInReadyPending; private volatile SocketAddress local; private volatile SocketAddress remote; - AbstractIOUringChannel(final Channel parent, Socket socket) { + AbstractIOUringChannel(final Channel parent, LinuxSocket socket) { super(parent); this.socket = checkNotNull(socket, "fd"); this.active = true; @@ -61,6 +61,17 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } + protected AbstractIOUringChannel(final Channel parent, LinuxSocket socket, boolean active) { + super(parent); + this.socket = checkNotNull(socket, "fd"); + this.active = active; + + if (active) { + this.local = socket.localAddress(); + this.remote = socket.remoteAddress(); + } + } + public boolean isOpen() { return socket.isOpen(); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 2708563a47..3900d54e08 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -26,10 +26,10 @@ import java.net.SocketAddress; abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel { AbstractIOUringServerChannel(int fd) { - super(null, new Socket(fd)); + super(null, new LinuxSocket(fd)); } - AbstractIOUringServerChannel(Socket fd) { + AbstractIOUringServerChannel(LinuxSocket fd) { super(null, fd); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelOption.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelOption.java new file mode 100644 index 0000000000..5ee6620c84 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelOption.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.channel.ChannelOption; +import io.netty.channel.unix.UnixChannelOption; + +import java.net.InetAddress; +import java.util.Map; + +public class IOUringChannelOption extends UnixChannelOption { + + public static final ChannelOption TCP_CORK = valueOf(IOUringChannelOption.class, "TCP_CORK"); + public static final ChannelOption TCP_NOTSENT_LOWAT = + valueOf(IOUringChannelOption.class, "TCP_NOTSENT_LOWAT"); + public static final ChannelOption TCP_KEEPIDLE = valueOf(IOUringChannelOption.class, "TCP_KEEPIDLE"); + public static final ChannelOption TCP_KEEPINTVL = valueOf(IOUringChannelOption.class, "TCP_KEEPINTVL"); + public static final ChannelOption TCP_KEEPCNT = valueOf(IOUringChannelOption.class, "TCP_KEEPCNT"); + public static final ChannelOption TCP_USER_TIMEOUT = + valueOf(IOUringChannelOption.class, "TCP_USER_TIMEOUT"); + public static final ChannelOption IP_FREEBIND = valueOf("IP_FREEBIND"); + public static final ChannelOption IP_TRANSPARENT = valueOf("IP_TRANSPARENT"); + public static final ChannelOption IP_RECVORIGDSTADDR = valueOf("IP_RECVORIGDSTADDR"); + public static final ChannelOption TCP_FASTOPEN = valueOf(IOUringChannelOption.class, "TCP_FASTOPEN"); + public static final ChannelOption TCP_FASTOPEN_CONNECT = + valueOf(IOUringChannelOption.class, "TCP_FASTOPEN_CONNECT"); + public static final ChannelOption TCP_DEFER_ACCEPT = + ChannelOption.valueOf(IOUringChannelOption.class, "TCP_DEFER_ACCEPT"); + public static final ChannelOption TCP_QUICKACK = valueOf(IOUringChannelOption.class, "TCP_QUICKACK"); + public static final ChannelOption> TCP_MD5SIG = valueOf("TCP_MD5SIG"); +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java index c8d40f3629..656ba3fb02 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java @@ -20,8 +20,13 @@ import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; +import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; public final class IOUringServerSocketChannel extends AbstractIOUringServerChannel implements ServerSocketChannel { private final IOUringServerSocketChannelConfig config; @@ -38,12 +43,7 @@ public final class IOUringServerSocketChannel extends AbstractIOUringServerChann @Override Channel newChildChannel(int fd) throws Exception { - return new IOUringSocketChannel(this, new Socket(fd)); - } - - @Override - public ServerSocketChannel parent() { - return (ServerSocketChannel) super.parent(); + return new IOUringSocketChannel(this, new LinuxSocket(fd)); } @Override @@ -59,8 +59,7 @@ public final class IOUringServerSocketChannel extends AbstractIOUringServerChann @Override public void doBind(SocketAddress localAddress) throws Exception { super.doBind(localAddress); - //Todo set config option - socket.listen(500); + socket.listen(config.getBacklog()); active = true; } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java index be825733f8..3910a9875e 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java @@ -15,8 +15,18 @@ */ package io.netty.channel.uring; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.ServerSocketChannelConfig; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Map; + public class IOUringServerSocketChannelConfig extends IOUringServerChannelConfig implements ServerSocketChannelConfig { IOUringServerSocketChannelConfig(AbstractIOUringServerChannel channel) { @@ -29,4 +39,241 @@ public class IOUringServerSocketChannelConfig extends IOUringServerChannelConfig super.setReuseAddress(reuseAddress); return this; } + + @Override + public Map, Object> getOptions() { + return getOptions(super.getOptions(), IOUringChannelOption.SO_REUSEPORT, IOUringChannelOption.IP_FREEBIND, + IOUringChannelOption.IP_TRANSPARENT, IOUringChannelOption.TCP_DEFER_ACCEPT); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == IOUringChannelOption.SO_REUSEPORT) { + return (T) Boolean.valueOf(isReusePort()); + } + if (option == IOUringChannelOption.IP_FREEBIND) { + return (T) Boolean.valueOf(isFreeBind()); + } + if (option == IOUringChannelOption.IP_TRANSPARENT) { + return (T) Boolean.valueOf(isIpTransparent()); + } + if (option == IOUringChannelOption.TCP_DEFER_ACCEPT) { + return (T) Integer.valueOf(getTcpDeferAccept()); + } + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == IOUringChannelOption.SO_REUSEPORT) { + setReusePort((Boolean) value); + } else if (option == IOUringChannelOption.IP_FREEBIND) { + setFreeBind((Boolean) value); + } else if (option == IOUringChannelOption.IP_TRANSPARENT) { + setIpTransparent((Boolean) value); + } else if (option == IOUringChannelOption.TCP_DEFER_ACCEPT) { + setTcpDeferAccept((Integer) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + @Override + public IOUringServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { + super.setReceiveBufferSize(receiveBufferSize); + return this; + } + + @Override + public IOUringServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + return this; + } + + @Override + public IOUringServerSocketChannelConfig setBacklog(int backlog) { + super.setBacklog(backlog); + return this; + } + + @Override + public IOUringServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + @Deprecated + public IOUringServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public IOUringServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public IOUringServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public IOUringServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public IOUringServerSocketChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + @Deprecated + public IOUringServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + @Deprecated + public IOUringServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public IOUringServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { + super.setWriteBufferWaterMark(writeBufferWaterMark); + return this; + } + + @Override + public IOUringServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } + +// /** +// * Set the {@code TCP_MD5SIG} option on the socket. See {@code linux/tcp.h} for more details. +// * Keys can only be set on, not read to prevent a potential leak, as they are confidential. +// * Allowing them being read would mean anyone with access to the channel could get them. +// */ +// public IOUringServerSocketChannelConfig setTcpMd5Sig(Map keys) { +// try { +// ((IOUringServerSocketChannel) channel).setTcpMd5Sig(keys); +// return this; +// } catch (IOException e) { +// throw new ChannelException(e); +// } +// } + + /** + * Returns {@code true} if the SO_REUSEPORT option is set. + */ + public boolean isReusePort() { + try { + return ((IOUringServerSocketChannel) channel).socket.isReusePort(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + +// /** +// * Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple +// * {@link EpollSocketChannel}s to the same port and so accept connections with multiple threads. +// * +// * Be aware this method needs be called before {@link EpollSocketChannel#bind(java.net.SocketAddress)} to have +// * any affect. +// */ + public IOUringServerSocketChannelConfig setReusePort(boolean reusePort) { + try { + ((IOUringServerSocketChannel) channel).socket.setReusePort(reusePort); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Returns {@code true} if IP_FREEBIND is enabled, + * {@code false} otherwise. + */ + public boolean isFreeBind() { + try { + return ((IOUringServerSocketChannel) channel).socket.isIpFreeBind(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * If {@code true} is used IP_FREEBIND is enabled, + * {@code false} for disable it. Default is disabled. + */ + public IOUringServerSocketChannelConfig setFreeBind(boolean freeBind) { + try { + ((IOUringServerSocketChannel) channel).socket.setIpFreeBind(freeBind); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Returns {@code true} if IP_TRANSPARENT is enabled, + * {@code false} otherwise. + */ + public boolean isIpTransparent() { + try { + return ((IOUringServerSocketChannel) channel).socket.isIpTransparent(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * If {@code true} is used IP_TRANSPARENT is enabled, + * {@code false} for disable it. Default is disabled. + */ + public IOUringServerSocketChannelConfig setIpTransparent(boolean transparent) { + try { + ((IOUringServerSocketChannel) channel).socket.setIpTransparent(transparent); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Set the {@code TCP_DEFER_ACCEPT} option on the socket. See {@code man 7 tcp} for more details. + */ + public IOUringServerSocketChannelConfig setTcpDeferAccept(int deferAccept) { + try { + ((IOUringServerSocketChannel) channel).socket.setTcpDeferAccept(deferAccept); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Returns a positive value if TCP_DEFER_ACCEPT is enabled. + */ + public int getTcpDeferAccept() { + try { + return ((IOUringServerSocketChannel) channel).socket.getTcpDeferAccept(); + } catch (IOException e) { + throw new ChannelException(e); + } + } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java index 8f8b2db2a2..94059a3b09 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java @@ -36,7 +36,12 @@ import java.net.SocketAddress; public final class IOUringSocketChannel extends AbstractIOUringChannel implements SocketChannel { private final IOUringSocketChannelConfig config; - IOUringSocketChannel(final Channel parent, final Socket fd) { + public IOUringSocketChannel() { + super(null, LinuxSocket.newSocketStream(), false); + this.config = new IOUringSocketChannelConfig(this); + } + + IOUringSocketChannel(final Channel parent, final LinuxSocket fd) { super(parent, fd); this.config = new IOUringSocketChannelConfig(this); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java index 6495dc174c..7ad44f3f5c 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java @@ -16,7 +16,9 @@ package io.netty.channel.uring; import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; @@ -24,25 +26,132 @@ import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.SocketChannelConfig; import java.io.IOException; +import java.net.InetAddress; +import java.util.Map; + +import static io.netty.channel.ChannelOption.*; +import static io.netty.channel.unix.Limits.*; public class IOUringSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { - private volatile boolean allowHalfClosure; + private volatile boolean allowHalfClosure; + private volatile long maxBytesPerGatheringWrite = SSIZE_MAX; - /** - * Creates a new instance. - */ - IOUringSocketChannelConfig(IOUringSocketChannel channel) { + public IOUringSocketChannelConfig(Channel channel) { super(channel); } @Override - public int getReceiveBufferSize() { - try { - return ((IOUringSocketChannel) channel).socket.getReceiveBufferSize(); - } catch (IOException e) { - throw new ChannelException(e); + public Map, Object> getOptions() { + return getOptions( + super.getOptions(), + SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS, + ALLOW_HALF_CLOSURE, IOUringChannelOption.TCP_CORK, IOUringChannelOption.TCP_NOTSENT_LOWAT, + IOUringChannelOption.TCP_KEEPCNT, IOUringChannelOption.TCP_KEEPIDLE, IOUringChannelOption.TCP_KEEPINTVL, + IOUringChannelOption.TCP_QUICKACK, IOUringChannelOption.IP_TRANSPARENT, + IOUringChannelOption.TCP_FASTOPEN_CONNECT); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); } + if (option == SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == TCP_NODELAY) { + return (T) Boolean.valueOf(isTcpNoDelay()); + } + if (option == SO_KEEPALIVE) { + return (T) Boolean.valueOf(isKeepAlive()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_LINGER) { + return (T) Integer.valueOf(getSoLinger()); + } + if (option == IP_TOS) { + return (T) Integer.valueOf(getTrafficClass()); + } + if (option == ALLOW_HALF_CLOSURE) { + return (T) Boolean.valueOf(isAllowHalfClosure()); + } + if (option == IOUringChannelOption.TCP_CORK) { + return (T) Boolean.valueOf(isTcpCork()); + } + if (option == IOUringChannelOption.TCP_NOTSENT_LOWAT) { + return (T) Long.valueOf(getTcpNotSentLowAt()); + } + if (option == IOUringChannelOption.TCP_KEEPIDLE) { + return (T) Integer.valueOf(getTcpKeepIdle()); + } + if (option == IOUringChannelOption.TCP_KEEPINTVL) { + return (T) Integer.valueOf(getTcpKeepIntvl()); + } + if (option == IOUringChannelOption.TCP_KEEPCNT) { + return (T) Integer.valueOf(getTcpKeepCnt()); + } + if (option == IOUringChannelOption.TCP_USER_TIMEOUT) { + return (T) Integer.valueOf(getTcpUserTimeout()); + } + if (option == IOUringChannelOption.TCP_QUICKACK) { + return (T) Boolean.valueOf(isTcpQuickAck()); + } + if (option == IOUringChannelOption.IP_TRANSPARENT) { + return (T) Boolean.valueOf(isIpTransparent()); + } + if (option == IOUringChannelOption.TCP_FASTOPEN_CONNECT) { + return (T) Boolean.valueOf(isTcpFastOpenConnect()); + } + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == TCP_NODELAY) { + setTcpNoDelay((Boolean) value); + } else if (option == SO_KEEPALIVE) { + setKeepAlive((Boolean) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_LINGER) { + setSoLinger((Integer) value); + } else if (option == IP_TOS) { + setTrafficClass((Integer) value); + } else if (option == ALLOW_HALF_CLOSURE) { + setAllowHalfClosure((Boolean) value); + } else if (option == IOUringChannelOption.TCP_CORK) { + setTcpCork((Boolean) value); + } else if (option == IOUringChannelOption.TCP_NOTSENT_LOWAT) { + setTcpNotSentLowAt((Long) value); + } else if (option == IOUringChannelOption.TCP_KEEPIDLE) { + setTcpKeepIdle((Integer) value); + } else if (option == IOUringChannelOption.TCP_KEEPCNT) { + setTcpKeepCnt((Integer) value); + } else if (option == IOUringChannelOption.TCP_KEEPINTVL) { + setTcpKeepIntvl((Integer) value); + } else if (option == IOUringChannelOption.TCP_USER_TIMEOUT) { + setTcpUserTimeout((Integer) value); + } else if (option == IOUringChannelOption.IP_TRANSPARENT) { + setIpTransparent((Boolean) value); + } else if (option == IOUringChannelOption.TCP_QUICKACK) { + setTcpQuickAck((Boolean) value); + } else if (option == IOUringChannelOption.TCP_FASTOPEN_CONNECT) { + setTcpFastOpenConnect((Boolean) value); + } else { + return super.setOption(option, value); + } + + return true; } @Override @@ -99,6 +208,85 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements } } + /** + * Get the {@code TCP_CORK} option on the socket. See {@code man 7 tcp} for more details. + */ + public boolean isTcpCork() { + try { + return ((IOUringSocketChannel) channel).socket.isTcpCork(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Get the {@code SO_BUSY_POLL} option on the socket. See {@code man 7 tcp} for more details. + */ + public int getSoBusyPoll() { + try { + return ((IOUringSocketChannel) channel).socket.getSoBusyPoll(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Get the {@code TCP_NOTSENT_LOWAT} option on the socket. See {@code man 7 tcp} for more details. + * + * @return value is a uint32_t + */ + public long getTcpNotSentLowAt() { + try { + return ((IOUringSocketChannel) channel).socket.getTcpNotSentLowAt(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Get the {@code TCP_KEEPIDLE} option on the socket. See {@code man 7 tcp} for more details. + */ + public int getTcpKeepIdle() { + try { + return ((IOUringSocketChannel) channel).socket.getTcpKeepIdle(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Get the {@code TCP_KEEPINTVL} option on the socket. See {@code man 7 tcp} for more details. + */ + public int getTcpKeepIntvl() { + try { + return ((IOUringSocketChannel) channel).socket.getTcpKeepIntvl(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Get the {@code TCP_KEEPCNT} option on the socket. See {@code man 7 tcp} for more details. + */ + public int getTcpKeepCnt() { + try { + return ((IOUringSocketChannel) channel).socket.getTcpKeepCnt(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Get the {@code TCP_USER_TIMEOUT} option on the socket. See {@code man 7 tcp} for more details. + */ + public int getTcpUserTimeout() { + try { + return ((IOUringSocketChannel) channel).socket.getTcpUserTimeout(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + @Override public IOUringSocketChannelConfig setKeepAlive(boolean keepAlive) { try { @@ -139,12 +327,22 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements public IOUringSocketChannelConfig setSendBufferSize(int sendBufferSize) { try { ((IOUringSocketChannel) channel).socket.setSendBufferSize(sendBufferSize); + calculateMaxBytesPerGatheringWrite(); return this; } catch (IOException e) { throw new ChannelException(e); } } + @Override + public int getReceiveBufferSize() { + try { + return ((IOUringSocketChannel) channel).socket.getReceiveBufferSize(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + @Override public IOUringSocketChannelConfig setSoLinger(int soLinger) { try { @@ -165,6 +363,44 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements } } + /** + * Set the {@code TCP_CORK} option on the socket. See {@code man 7 tcp} for more details. + */ + public IOUringSocketChannelConfig setTcpCork(boolean tcpCork) { + try { + ((IOUringSocketChannel) channel).socket.setTcpCork(tcpCork); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Set the {@code SO_BUSY_POLL} option on the socket. See {@code man 7 tcp} for more details. + */ + public IOUringSocketChannelConfig setSoBusyPoll(int loopMicros) { + try { + ((IOUringSocketChannel) channel).socket.setSoBusyPoll(loopMicros); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Set the {@code TCP_NOTSENT_LOWAT} option on the socket. See {@code man 7 tcp} for more details. + * + * @param tcpNotSentLowAt is a uint32_t + */ + public IOUringSocketChannelConfig setTcpNotSentLowAt(long tcpNotSentLowAt) { + try { + ((IOUringSocketChannel) channel).socket.setTcpNotSentLowAt(tcpNotSentLowAt); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + @Override public IOUringSocketChannelConfig setTrafficClass(int trafficClass) { try { @@ -175,6 +411,151 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements } } + /** + * Set the {@code TCP_KEEPIDLE} option on the socket. See {@code man 7 tcp} for more details. + */ + public IOUringSocketChannelConfig setTcpKeepIdle(int seconds) { + try { + ((IOUringSocketChannel) channel).socket.setTcpKeepIdle(seconds); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Set the {@code TCP_KEEPINTVL} option on the socket. See {@code man 7 tcp} for more details. + */ + public IOUringSocketChannelConfig setTcpKeepIntvl(int seconds) { + try { + ((IOUringSocketChannel) channel).socket.setTcpKeepIntvl(seconds); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * @deprecated use {@link #setTcpKeepCnt(int)} + */ + @Deprecated + public IOUringSocketChannelConfig setTcpKeepCntl(int probes) { + return setTcpKeepCnt(probes); + } + + /** + * Set the {@code TCP_KEEPCNT} option on the socket. See {@code man 7 tcp} for more details. + */ + public IOUringSocketChannelConfig setTcpKeepCnt(int probes) { + try { + ((IOUringSocketChannel) channel).socket.setTcpKeepCnt(probes); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Set the {@code TCP_USER_TIMEOUT} option on the socket. See {@code man 7 tcp} for more details. + */ + public IOUringSocketChannelConfig setTcpUserTimeout(int milliseconds) { + try { + ((IOUringSocketChannel) channel).socket.setTcpUserTimeout(milliseconds); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Returns {@code true} if IP_TRANSPARENT is enabled, + * {@code false} otherwise. + */ + public boolean isIpTransparent() { + try { + return ((IOUringSocketChannel) channel).socket.isIpTransparent(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * If {@code true} is used IP_TRANSPARENT is enabled, + * {@code false} for disable it. Default is disabled. + */ + public IOUringSocketChannelConfig setIpTransparent(boolean transparent) { + try { + ((IOUringSocketChannel) channel).socket.setIpTransparent(transparent); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + +// /** +// * Set the {@code TCP_MD5SIG} option on the socket. See {@code linux/tcp.h} for more details. Keys can only be set +// * on, not read to prevent a potential leak, as they are confidential. Allowing them being read would mean anyone +// * with access to the channel could get them. +// */ +// public IOUringSocketChannelConfig setTcpMd5Sig(Map keys) { +// try { +// ((IOUringSocketChannel) channel).setTcpMd5Sig(keys); +// return this; +// } catch (IOException e) { +// throw new ChannelException(e); +// } +// } + + /** + * Set the {@code TCP_QUICKACK} option on the socket. See TCP_QUICKACK + * for more details. + */ + public IOUringSocketChannelConfig setTcpQuickAck(boolean quickAck) { + try { + ((IOUringSocketChannel) channel).socket.setTcpQuickAck(quickAck); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Returns {@code true} if TCP_QUICKACK is enabled, {@code false} + * otherwise. + */ + public boolean isTcpQuickAck() { + try { + return ((IOUringSocketChannel) channel).socket.isTcpQuickAck(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Set the {@code TCP_FASTOPEN_CONNECT} option on the socket. Requires Linux kernel 4.11 or later. See + * this commit + * for more details. + */ + public IOUringSocketChannelConfig setTcpFastOpenConnect(boolean fastOpenConnect) { + try { + ((IOUringSocketChannel) channel).socket.setTcpFastOpenConnect(fastOpenConnect); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + /** + * Returns {@code true} if {@code TCP_FASTOPEN_CONNECT} is enabled, {@code false} otherwise. + */ + public boolean isTcpFastOpenConnect() { + try { + return ((IOUringSocketChannel) channel).socket.isTcpFastOpenConnect(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + @Override public boolean isAllowHalfClosure() { return allowHalfClosure; @@ -254,4 +635,16 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements super.setMessageSizeEstimator(estimator); return this; } + + final void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) { + this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite; + } + + private void calculateMaxBytesPerGatheringWrite() { + // Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide. + int newSendBufferSize = getSendBufferSize() << 1; + if (newSendBufferSize > 0) { + setMaxBytesPerGatheringWrite(getSendBufferSize() << 1); + } + } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringTcpInfo.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringTcpInfo.java new file mode 100644 index 0000000000..d42edf1b37 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringTcpInfo.java @@ -0,0 +1,193 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +/** + *

+ * struct tcp_info + * { + * __u8 tcpi_state; + * __u8 tcpi_ca_state; + * __u8 tcpi_retransmits; + * __u8 tcpi_probes; + * __u8 tcpi_backoff; + * __u8 tcpi_options; + * __u8 tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4; + * + * __u32 tcpi_rto; + * __u32 tcpi_ato; + * __u32 tcpi_snd_mss; + * __u32 tcpi_rcv_mss; + * + * __u32 tcpi_unacked; + * __u32 tcpi_sacked; + * __u32 tcpi_lost; + * __u32 tcpi_retrans; + * __u32 tcpi_fackets; + * + * __u32 tcpi_last_data_sent; + * __u32 tcpi_last_ack_sent; + * __u32 tcpi_last_data_recv; + * __u32 tcpi_last_ack_recv; + * + * __u32 tcpi_pmtu; + * __u32 tcpi_rcv_ssthresh; + * __u32 tcpi_rtt; + * __u32 tcpi_rttvar; + * __u32 tcpi_snd_ssthresh; + * __u32 tcpi_snd_cwnd; + * __u32 tcpi_advmss; + * __u32 tcpi_reordering; + * + * __u32 tcpi_rcv_rtt; + * __u32 tcpi_rcv_space; + * + * __u32 tcpi_total_retrans; + * }; + *

+ */ +public final class IOUringTcpInfo { + + final long[] info = new long[32]; + + public int state() { + return (int) info[0]; + } + + public int caState() { + return (int) info[1]; + } + + public int retransmits() { + return (int) info[2]; + } + + public int probes() { + return (int) info[3]; + } + + public int backoff() { + return (int) info[4]; + } + + public int options() { + return (int) info[5]; + } + + public int sndWscale() { + return (int) info[6]; + } + + public int rcvWscale() { + return (int) info[7]; + } + + public long rto() { + return info[8]; + } + + public long ato() { + return info[9]; + } + + public long sndMss() { + return info[10]; + } + + public long rcvMss() { + return info[11]; + } + + public long unacked() { + return info[12]; + } + + public long sacked() { + return info[13]; + } + + public long lost() { + return info[14]; + } + + public long retrans() { + return info[15]; + } + + public long fackets() { + return info[16]; + } + + public long lastDataSent() { + return info[17]; + } + + public long lastAckSent() { + return info[18]; + } + + public long lastDataRecv() { + return info[19]; + } + + public long lastAckRecv() { + return info[20]; + } + + public long pmtu() { + return info[21]; + } + + public long rcvSsthresh() { + return info[22]; + } + + public long rtt() { + return info[23]; + } + + public long rttvar() { + return info[24]; + } + + public long sndSsthresh() { + return info[25]; + } + + public long sndCwnd() { + return info[26]; + } + + public long advmss() { + return info[27]; + } + + public long reordering() { + return info[28]; + } + + public long rcvRtt() { + return info[29]; + } + + public long rcvSpace() { + return info[30]; + } + + public long totalRetrans() { + return info[31]; + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java new file mode 100644 index 0000000000..da79e73fd8 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java @@ -0,0 +1,386 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.channel.ChannelException; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.socket.InternetProtocolFamily; +import io.netty.channel.unix.NativeInetAddress; +import io.netty.channel.unix.PeerCredentials; +import io.netty.channel.unix.Socket; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.SocketUtils; + +import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.UnknownHostException; +import java.util.Enumeration; + +import static io.netty.channel.unix.Errors.*; + +/** + * A socket which provides access Linux native methods. + */ +final class LinuxSocket extends Socket { + static final InetAddress INET6_ANY = unsafeInetAddrByName("::"); + private static final InetAddress INET_ANY = unsafeInetAddrByName("0.0.0.0"); + private static final long MAX_UINT32_T = 0xFFFFFFFFL; + + LinuxSocket(int fd) { + super(fd); + } + + private InternetProtocolFamily family() { + return ipv6 ? InternetProtocolFamily.IPv6 : InternetProtocolFamily.IPv4; + } + + void setTimeToLive(int ttl) throws IOException { + setTimeToLive(intValue(), ttl); + } + + void setInterface(InetAddress address) throws IOException { + final NativeInetAddress a = NativeInetAddress.newInstance(address); + setInterface(intValue(), ipv6, a.address(), a.scopeId(), interfaceIndex(address)); + } + + void setNetworkInterface(NetworkInterface netInterface) throws IOException { + InetAddress address = deriveInetAddress(netInterface, family() == InternetProtocolFamily.IPv6); + if (address.equals(family() == InternetProtocolFamily.IPv4 ? INET_ANY : INET6_ANY)) { + throw new IOException("NetworkInterface does not support " + family()); + } + final NativeInetAddress nativeAddress = NativeInetAddress.newInstance(address); + setInterface(intValue(), ipv6, nativeAddress.address(), nativeAddress.scopeId(), interfaceIndex(netInterface)); + } + + InetAddress getInterface() throws IOException { + NetworkInterface inf = getNetworkInterface(); + if (inf != null) { + Enumeration addresses = SocketUtils.addressesFromNetworkInterface(inf); + if (addresses.hasMoreElements()) { + return addresses.nextElement(); + } + } + return null; + } + + NetworkInterface getNetworkInterface() throws IOException { + int ret = getInterface(intValue(), ipv6); + if (ipv6) { + return PlatformDependent.javaVersion() >= 7 ? NetworkInterface.getByIndex(ret) : null; + } + InetAddress address = inetAddress(ret); + return address != null ? NetworkInterface.getByInetAddress(address) : null; + } + + private static InetAddress inetAddress(int value) { + byte[] var1 = { + (byte) (value >>> 24 & 255), + (byte) (value >>> 16 & 255), + (byte) (value >>> 8 & 255), + (byte) (value & 255) + }; + + try { + return InetAddress.getByAddress(var1); + } catch (UnknownHostException ignore) { + return null; + } + } + + void joinGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException { + final NativeInetAddress g = NativeInetAddress.newInstance(group); + final boolean isIpv6 = group instanceof Inet6Address; + final NativeInetAddress i = NativeInetAddress.newInstance(deriveInetAddress(netInterface, isIpv6)); + if (source != null) { + final NativeInetAddress s = NativeInetAddress.newInstance(source); + joinSsmGroup(intValue(), ipv6, g.address(), i.address(), + g.scopeId(), interfaceIndex(netInterface), s.address()); + } else { + joinGroup(intValue(), ipv6, g.address(), i.address(), g.scopeId(), interfaceIndex(netInterface)); + } + } + + void leaveGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException { + final NativeInetAddress g = NativeInetAddress.newInstance(group); + final boolean isIpv6 = group instanceof Inet6Address; + final NativeInetAddress i = NativeInetAddress.newInstance(deriveInetAddress(netInterface, isIpv6)); + if (source != null) { + final NativeInetAddress s = NativeInetAddress.newInstance(source); + leaveSsmGroup(intValue(), ipv6, g.address(), i.address(), + g.scopeId(), interfaceIndex(netInterface), s.address()); + } else { + leaveGroup(intValue(), ipv6, g.address(), i.address(), g.scopeId(), interfaceIndex(netInterface)); + } + } + + private static int interfaceIndex(NetworkInterface networkInterface) { + return PlatformDependent.javaVersion() >= 7 ? networkInterface.getIndex() : -1; + } + + private static int interfaceIndex(InetAddress address) throws IOException { + if (PlatformDependent.javaVersion() >= 7) { + NetworkInterface iface = NetworkInterface.getByInetAddress(address); + if (iface != null) { + return iface.getIndex(); + } + } + return -1; + } + + void setTcpDeferAccept(int deferAccept) throws IOException { + setTcpDeferAccept(intValue(), deferAccept); + } + + void setTcpQuickAck(boolean quickAck) throws IOException { + setTcpQuickAck(intValue(), quickAck ? 1 : 0); + } + + void setTcpCork(boolean tcpCork) throws IOException { + setTcpCork(intValue(), tcpCork ? 1 : 0); + } + + void setSoBusyPoll(int loopMicros) throws IOException { + setSoBusyPoll(intValue(), loopMicros); + } + + void setTcpNotSentLowAt(long tcpNotSentLowAt) throws IOException { + if (tcpNotSentLowAt < 0 || tcpNotSentLowAt > MAX_UINT32_T) { + throw new IllegalArgumentException("tcpNotSentLowAt must be a uint32_t"); + } + setTcpNotSentLowAt(intValue(), (int) tcpNotSentLowAt); + } + + void setTcpFastOpen(int tcpFastopenBacklog) throws IOException { + setTcpFastOpen(intValue(), tcpFastopenBacklog); + } + + void setTcpFastOpenConnect(boolean tcpFastOpenConnect) throws IOException { + setTcpFastOpenConnect(intValue(), tcpFastOpenConnect ? 1 : 0); + } + + boolean isTcpFastOpenConnect() throws IOException { + return isTcpFastOpenConnect(intValue()) != 0; + } + + void setTcpKeepIdle(int seconds) throws IOException { + setTcpKeepIdle(intValue(), seconds); + } + + void setTcpKeepIntvl(int seconds) throws IOException { + setTcpKeepIntvl(intValue(), seconds); + } + + void setTcpKeepCnt(int probes) throws IOException { + setTcpKeepCnt(intValue(), probes); + } + + void setTcpUserTimeout(int milliseconds) throws IOException { + setTcpUserTimeout(intValue(), milliseconds); + } + + void setIpFreeBind(boolean enabled) throws IOException { + setIpFreeBind(intValue(), enabled ? 1 : 0); + } + + void setIpTransparent(boolean enabled) throws IOException { + setIpTransparent(intValue(), enabled ? 1 : 0); + } + + void setIpRecvOrigDestAddr(boolean enabled) throws IOException { + setIpRecvOrigDestAddr(intValue(), enabled ? 1 : 0); + } + + int getTimeToLive() throws IOException { + return getTimeToLive(intValue()); + } + + void getTcpInfo(IOUringTcpInfo info) throws IOException { + getTcpInfo(intValue(), info.info); + } + + void setTcpMd5Sig(InetAddress address, byte[] key) throws IOException { + final NativeInetAddress a = NativeInetAddress.newInstance(address); + setTcpMd5Sig(intValue(), ipv6, a.address(), a.scopeId(), key); + } + + boolean isTcpCork() throws IOException { + return isTcpCork(intValue()) != 0; + } + + int getSoBusyPoll() throws IOException { + return getSoBusyPoll(intValue()); + } + + int getTcpDeferAccept() throws IOException { + return getTcpDeferAccept(intValue()); + } + + boolean isTcpQuickAck() throws IOException { + return isTcpQuickAck(intValue()) != 0; + } + + long getTcpNotSentLowAt() throws IOException { + return getTcpNotSentLowAt(intValue()) & MAX_UINT32_T; + } + + int getTcpKeepIdle() throws IOException { + return getTcpKeepIdle(intValue()); + } + + int getTcpKeepIntvl() throws IOException { + return getTcpKeepIntvl(intValue()); + } + + int getTcpKeepCnt() throws IOException { + return getTcpKeepCnt(intValue()); + } + + int getTcpUserTimeout() throws IOException { + return getTcpUserTimeout(intValue()); + } + + boolean isIpFreeBind() throws IOException { + return isIpFreeBind(intValue()) != 0; + } + + boolean isIpTransparent() throws IOException { + return isIpTransparent(intValue()) != 0; + } + + boolean isIpRecvOrigDestAddr() throws IOException { + return isIpRecvOrigDestAddr(intValue()) != 0; + } + + PeerCredentials getPeerCredentials() throws IOException { + return getPeerCredentials(intValue()); + } + + boolean isLoopbackModeDisabled() throws IOException { + return getIpMulticastLoop(intValue(), ipv6) == 0; + } + + void setLoopbackModeDisabled(boolean loopbackModeDisabled) throws IOException { + setIpMulticastLoop(intValue(), ipv6, loopbackModeDisabled ? 0 : 1); + } + + long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException { + // Open the file-region as it may be created via the lazy constructor. This is needed as we directly access + // the FileChannel field via JNI. + src.open(); + + long res = sendFile(intValue(), src, baseOffset, offset, length); + if (res >= 0) { + return res; + } + return ioResult("sendfile", (int) res); + } + + private static InetAddress deriveInetAddress(NetworkInterface netInterface, boolean ipv6) { + final InetAddress ipAny = ipv6 ? INET6_ANY : INET_ANY; + if (netInterface != null) { + final Enumeration ias = netInterface.getInetAddresses(); + while (ias.hasMoreElements()) { + final InetAddress ia = ias.nextElement(); + final boolean isV6 = ia instanceof Inet6Address; + if (isV6 == ipv6) { + return ia; + } + } + } + return ipAny; + } + + public static LinuxSocket newSocketStream(boolean ipv6) { + return new LinuxSocket(newSocketStream0(ipv6)); + } + + public static LinuxSocket newSocketStream() { + return newSocketStream(isIPv6Preferred()); + } + + public static LinuxSocket newSocketDgram(boolean ipv6) { + return new LinuxSocket(newSocketDgram0(ipv6)); + } + + public static LinuxSocket newSocketDgram() { + return newSocketDgram(isIPv6Preferred()); + } + + public static LinuxSocket newSocketDomain() { + return new LinuxSocket(newSocketDomain0()); + } + + private static InetAddress unsafeInetAddrByName(String inetName) { + try { + return InetAddress.getByName(inetName); + } catch (UnknownHostException uhe) { + throw new ChannelException(uhe); + } + } + + private static native void joinGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress, + int scopeId, int interfaceIndex) throws IOException; + private static native void joinSsmGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress, + int scopeId, int interfaceIndex, byte[] source) throws IOException; + private static native void leaveGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress, + int scopeId, int interfaceIndex) throws IOException; + private static native void leaveSsmGroup(int fd, boolean ipv6, byte[] group, byte[] interfaceAddress, + int scopeId, int interfaceIndex, byte[] source) throws IOException; + private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset, + long offset, long length) throws IOException; + + private static native int getTcpDeferAccept(int fd) throws IOException; + private static native int isTcpQuickAck(int fd) throws IOException; + private static native int isTcpCork(int fd) throws IOException; + private static native int getSoBusyPoll(int fd) throws IOException; + private static native int getTcpNotSentLowAt(int fd) throws IOException; + private static native int getTcpKeepIdle(int fd) throws IOException; + private static native int getTcpKeepIntvl(int fd) throws IOException; + private static native int getTcpKeepCnt(int fd) throws IOException; + private static native int getTcpUserTimeout(int fd) throws IOException; + private static native int getTimeToLive(int fd) throws IOException; + private static native int isIpFreeBind(int fd) throws IOException; + private static native int isIpTransparent(int fd) throws IOException; + private static native int isIpRecvOrigDestAddr(int fd) throws IOException; + private static native void getTcpInfo(int fd, long[] array) throws IOException; + private static native PeerCredentials getPeerCredentials(int fd) throws IOException; + private static native int isTcpFastOpenConnect(int fd) throws IOException; + + private static native void setTcpDeferAccept(int fd, int deferAccept) throws IOException; + private static native void setTcpQuickAck(int fd, int quickAck) throws IOException; + private static native void setTcpCork(int fd, int tcpCork) throws IOException; + private static native void setSoBusyPoll(int fd, int loopMicros) throws IOException; + private static native void setTcpNotSentLowAt(int fd, int tcpNotSentLowAt) throws IOException; + private static native void setTcpFastOpen(int fd, int tcpFastopenBacklog) throws IOException; + private static native void setTcpFastOpenConnect(int fd, int tcpFastOpenConnect) throws IOException; + private static native void setTcpKeepIdle(int fd, int seconds) throws IOException; + private static native void setTcpKeepIntvl(int fd, int seconds) throws IOException; + private static native void setTcpKeepCnt(int fd, int probes) throws IOException; + private static native void setTcpUserTimeout(int fd, int milliseconds)throws IOException; + private static native void setIpFreeBind(int fd, int freeBind) throws IOException; + private static native void setIpTransparent(int fd, int transparent) throws IOException; + private static native void setIpRecvOrigDestAddr(int fd, int transparent) throws IOException; + private static native void setTcpMd5Sig( + int fd, boolean ipv6, byte[] address, int scopeId, byte[] key) throws IOException; + private static native void setInterface( + int fd, boolean ipv6, byte[] interfaceAddress, int scopeId, int networkInterfaceIndex) throws IOException; + private static native int getInterface(int fd, boolean ipv6); + private static native int getIpMulticastLoop(int fd, boolean ipv6) throws IOException; + private static native void setIpMulticastLoop(int fd, boolean ipv6, int enabled) throws IOException; + private static native void setTimeToLive(int fd, int ttl) throws IOException; +}