diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index b8338baa2e..10b8dad893 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -76,6 +76,11 @@ #define UDP_GRO 104 #endif +#ifdef IP_RECVORIGDSTADDR +#if !defined(SOL_IP) && defined(IPPROTO_IP) +#define SOL_IP IPPROTO_IP +#endif /* !SOL_IP && IPPROTO_IP */ +#endif // IP_RECVORIGDSTADDR // optional extern int epoll_create1(int flags) __attribute__((weak)); @@ -115,11 +120,16 @@ struct mmsghdr { #endif // SYS_sendmmsg // Those are initialized in the init(...) method and cached for performance reasons -static jfieldID packetAddrFieldId = NULL; -static jfieldID packetAddrLenFieldId = NULL; +static jfieldID packetSenderAddrFieldId = NULL; +static jfieldID packetSenderAddrLenFieldId = NULL; +static jfieldID packetSenderScopeIdFieldId = NULL; +static jfieldID packetSenderPortFieldId = NULL; +static jfieldID packetRecipientAddrFieldId = NULL; +static jfieldID packetRecipientAddrLenFieldId = NULL; +static jfieldID packetRecipientScopeIdFieldId = NULL; +static jfieldID packetRecipientPortFieldId = NULL; + static jfieldID packetSegmentSizeFieldId = NULL; -static jfieldID packetScopeIdFieldId = NULL; -static jfieldID packetPortFieldId = NULL; static jfieldID packetMemoryAddressFieldId = NULL; static jfieldID packetCountFieldId = NULL; @@ -344,8 +354,8 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo for (i = 0; i < len; i++) { jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset); - jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId); - jint addrLen = (*env)->GetIntField(env, packet, packetAddrLenFieldId); + jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetRecipientAddrFieldId); + jint addrLen = (*env)->GetIntField(env, packet, packetRecipientAddrLenFieldId); jint packetSegmentSize = (*env)->GetIntField(env, packet, packetSegmentSizeFieldId); if (packetSegmentSize > 0) { msg[i].msg_hdr.msg_control = controls[i]; @@ -357,8 +367,8 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo *((uint16_t *) CMSG_DATA(cm)) = packetSegmentSize; } if (addrLen != 0) { - jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId); - jint port = (*env)->GetIntField(env, packet, packetPortFieldId); + jint scopeId = (*env)->GetIntField(env, packet, packetRecipientScopeIdFieldId); + jint port = (*env)->GetIntField(env, packet, packetRecipientPortFieldId); if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr[i], &addrSize) == -1) { return -1; @@ -385,20 +395,17 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo return (jint) res; } -static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len) { - jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId); - - (*env)->SetIntField(env, packet, packetCountFieldId, len); - - struct sockaddr_storage* addr = (struct sockaddr_storage*) msg->msg_name; +static void init_packet_address(JNIEnv* env, jobject packet, struct sockaddr_storage* addr, jfieldID addrFieldId, + jfieldID addrLenFieldId, jfieldID scopeIdFieldId, jfieldID portFieldId) { + jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, addrFieldId); if (addr->ss_family == AF_INET) { struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr; (*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr); - (*env)->SetIntField(env, packet, packetAddrLenFieldId, 4); - (*env)->SetIntField(env, packet, packetScopeIdFieldId, 0); - (*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port)); + (*env)->SetIntField(env, packet, addrLenFieldId, 4); + (*env)->SetIntField(env, packet, scopeIdFieldId, 0); + (*env)->SetIntField(env, packet, portFieldId, ntohs(ipaddr->sin_port)); } else { int addrLen = netty_unix_socket_ipAddressLength(addr); struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr; @@ -410,10 +417,17 @@ static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len } else { (*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr); } - (*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen); - (*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id); - (*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port)); + (*env)->SetIntField(env, packet, addrLenFieldId, addrLen); + (*env)->SetIntField(env, packet, scopeIdFieldId, ip6addr->sin6_scope_id); + (*env)->SetIntField(env, packet, portFieldId, ntohs(ip6addr->sin6_port)); } +} + +static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len) { + (*env)->SetIntField(env, packet, packetCountFieldId, len); + + init_packet_address(env, packet, (struct sockaddr_storage*) msg->msg_name, packetSenderAddrFieldId, packetSenderAddrLenFieldId, packetSenderScopeIdFieldId, packetSenderPortFieldId); + struct cmsghdr *cmsg = NULL; uint16_t gso_size = 0; uint16_t *gsosizeptr = NULL; @@ -421,8 +435,12 @@ static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) { gsosizeptr = (uint16_t *) CMSG_DATA(cmsg); gso_size = *gsosizeptr; - break; } +#ifdef IP_RECVORIGDSTADDR + else if (cmsg->cmsg_level == SOL_IP && cmsg->cmsg_type == IP_RECVORIGDSTADDR) { + init_packet_address(env, packet, (struct sockaddr_storage*) CMSG_DATA(cmsg), packetRecipientAddrFieldId, packetRecipientAddrLenFieldId, packetRecipientScopeIdFieldId, packetRecipientPortFieldId); + } +#endif // IP_RECVORIGDSTADDR } (*env)->SetIntField(env, packet, packetSegmentSizeFieldId, gso_size); } @@ -431,8 +449,8 @@ static jint netty_epoll_native_recvmsg0(JNIEnv* env, jclass clazz, jint fd, jboo struct msghdr msg = { 0 }; struct sockaddr_storage sock_address; int addrSize = sizeof(sock_address); - // Enough space for GRO - char control[CMSG_SPACE(sizeof(uint16_t))] = { 0 }; + // Enough space for GRO and IP_RECVORIGDSTADDR + char control[CMSG_SPACE(sizeof(uint16_t)) + sizeof(struct sockaddr_storage)] = { 0 }; msg.msg_name = &sock_address; msg.msg_namelen = (socklen_t) addrSize; msg.msg_iov = (struct iovec*) (intptr_t) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId); @@ -458,6 +476,16 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo struct sockaddr_storage addr[len]; int addrSize = sizeof(addr); memset(addr, 0, addrSize); + int storageSize = sizeof(struct sockaddr_storage); + char* cntrlbuf = NULL; + +#ifdef IP_RECVORIGDSTADDR + int readLocalAddr = 0; + if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_RECVORIGDSTADDR, + &readLocalAddr, sizeof(readLocalAddr)) < 0) { + cntrlbuf = malloc(sizeof(char) * storageSize * len); + } +#endif // IP_RECVORIGDSTADDR int i; @@ -468,6 +496,11 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo msg[i].msg_hdr.msg_name = addr + i; msg[i].msg_hdr.msg_namelen = (socklen_t) addrSize; + + if (cntrlbuf != NULL) { + msg[i].msg_hdr.msg_control = cntrlbuf + i * storageSize; + msg[i].msg_hdr.msg_controllen = storageSize; + } } ssize_t res; @@ -479,15 +512,18 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo // keep on reading if it was interrupted } while (res == -1 && ((err = errno) == EINTR)); + if (res >= 0) { + for (i = 0; i < res; i++) { + jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset); + init_packet(env, packet, &msg[i].msg_hdr, msg[i].msg_len); + } + } + // Free the control message buffer if needed. + free(cntrlbuf); + if (res < 0) { return -err; } - - for (i = 0; i < res; i++) { - jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset); - init_packet(env, packet, &msg[i].msg_hdr, msg[i].msg_len); - } - return (jint) res; } @@ -735,11 +771,15 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix NETTY_JNI_UTIL_FIND_CLASS(env, nativeDatagramPacketCls, nettyClassName, done); netty_jni_util_free_dynamic_name(&nettyClassName); - NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetAddrFieldId, "addr", "[B", done); - NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetAddrLenFieldId, "addrLen", "I", done); + NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSenderAddrFieldId, "senderAddr", "[B", done); + NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSenderAddrLenFieldId, "senderAddrLen", "I", done); + NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSenderScopeIdFieldId, "senderScopeId", "I", done); + NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSenderPortFieldId, "senderPort", "I", done); + NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetRecipientAddrFieldId, "recipientAddr", "[B", done); + NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetRecipientAddrLenFieldId, "recipientAddrLen", "I", done); + NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetRecipientScopeIdFieldId, "recipientScopeId", "I", done); + NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetRecipientPortFieldId, "recipientPort", "I", done); NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetSegmentSizeFieldId, "segmentSize", "I", done); - NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetScopeIdFieldId, "scopeId", "I", done); - NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetPortFieldId, "port", "I", done); NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetMemoryAddressFieldId, "memoryAddress", "J", done); NETTY_JNI_UTIL_GET_FIELD(env, nativeDatagramPacketCls, packetCountFieldId, "count", "I", done); @@ -763,11 +803,15 @@ done: if (linuxsocketOnLoadCalled == 1) { netty_epoll_linuxsocket_JNI_OnUnLoad(env, packagePrefix); } - packetAddrFieldId = NULL; - packetAddrLenFieldId = NULL; + packetSenderAddrFieldId = NULL; + packetSenderAddrLenFieldId = NULL; + packetSenderScopeIdFieldId = NULL; + packetSenderPortFieldId = NULL; + packetRecipientAddrFieldId = NULL; + packetRecipientAddrLenFieldId = NULL; + packetRecipientScopeIdFieldId = NULL; + packetRecipientPortFieldId = NULL; packetSegmentSizeFieldId = NULL; - packetScopeIdFieldId = NULL; - packetPortFieldId = NULL; packetMemoryAddressFieldId = NULL; packetCountFieldId = NULL; } @@ -790,11 +834,15 @@ static void netty_epoll_native_JNI_OnUnload(JNIEnv* env) { staticPackagePrefix = NULL; } - packetAddrFieldId = NULL; - packetAddrLenFieldId = NULL; + packetSenderAddrFieldId = NULL; + packetSenderAddrLenFieldId = NULL; + packetSenderScopeIdFieldId = NULL; + packetSenderPortFieldId = NULL; + packetRecipientAddrFieldId = NULL; + packetRecipientAddrLenFieldId = NULL; + packetRecipientScopeIdFieldId = NULL; + packetRecipientPortFieldId = NULL; packetSegmentSizeFieldId = NULL; - packetScopeIdFieldId = NULL; - packetPortFieldId = NULL; packetMemoryAddressFieldId = NULL; packetCountFieldId = NULL; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java index 602fc5f690..a33c83cd07 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java @@ -142,6 +142,18 @@ final class NativeDatagramPacketArray { } } + private static InetSocketAddress newAddress(byte[] addr, int addrLen, int port, int scopeId, byte[] ipv4Bytes) + throws UnknownHostException { + final InetAddress address; + if (addrLen == ipv4Bytes.length) { + System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen); + address = InetAddress.getByAddress(ipv4Bytes); + } else { + address = Inet6Address.getByAddress(null, addr, scopeId); + } + return new InetSocketAddress(address, port); + } + /** * Used to pass needed data to JNI. */ @@ -152,45 +164,50 @@ final class NativeDatagramPacketArray { private long memoryAddress; private int count; - private final byte[] addr = new byte[16]; + private final byte[] senderAddr = new byte[16]; + private int senderAddrLen; + private int senderScopeId; + private int senderPort; + + private final byte[] recipientAddr = new byte[16]; + private int recipientAddrLen; + private int recipientScopeId; + private int recipientPort; private int segmentSize; - private int addrLen; - private int scopeId; - private int port; private void init(long memoryAddress, int count, int segmentSize, InetSocketAddress recipient) { this.memoryAddress = memoryAddress; this.count = count; this.segmentSize = segmentSize; + this.senderScopeId = 0; + this.senderPort = 0; + this.senderAddrLen = 0; + if (recipient == null) { - this.scopeId = 0; - this.port = 0; - this.addrLen = 0; + this.recipientScopeId = 0; + this.recipientPort = 0; + this.recipientAddrLen = 0; } else { InetAddress address = recipient.getAddress(); if (address instanceof Inet6Address) { - System.arraycopy(address.getAddress(), 0, addr, 0, addr.length); - scopeId = ((Inet6Address) address).getScopeId(); + System.arraycopy(address.getAddress(), 0, recipientAddr, 0, recipientAddr.length); + recipientScopeId = ((Inet6Address) address).getScopeId(); } else { - copyIpv4MappedIpv6Address(address.getAddress(), addr); - scopeId = 0; + copyIpv4MappedIpv6Address(address.getAddress(), recipientAddr); + recipientScopeId = 0; } - addrLen = addr.length; - port = recipient.getPort(); + recipientAddrLen = recipientAddr.length; + recipientPort = recipient.getPort(); } } DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress recipient) throws UnknownHostException { - final InetAddress address; - if (addrLen == ipv4Bytes.length) { - System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen); - address = InetAddress.getByAddress(ipv4Bytes); - } else { - address = Inet6Address.getByAddress(null, addr, scopeId); + InetSocketAddress sender = newAddress(senderAddr, senderAddrLen, senderPort, senderScopeId, ipv4Bytes); + if (recipientAddrLen != 0) { + recipient = newAddress(recipientAddr, recipientAddrLen, recipientPort, recipientScopeId, ipv4Bytes); } - InetSocketAddress sender = new InetSocketAddress(address, port); buffer.writerIndex(count); // UDP_GRO diff --git a/transport-native-unix-common/src/main/c/netty_unix_socket.c b/transport-native-unix-common/src/main/c/netty_unix_socket.c index 4243fbd288..59c9b496a5 100644 --- a/transport-native-unix-common/src/main/c/netty_unix_socket.c +++ b/transport-native-unix-common/src/main/c/netty_unix_socket.c @@ -339,43 +339,9 @@ static jobject _recvFrom(JNIEnv* env, jint fd, void* buffer, jint pos, jint limi socklen_t addrlen = sizeof(addr); ssize_t res; int err; - jobject local = NULL; - -#ifdef IP_RECVORIGDSTADDR - struct sockaddr_storage daddr; - struct iovec iov; - struct cmsghdr* cmsg; - struct msghdr msg; - char cntrlbuf[64]; - - int readLocalAddr; - if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_RECVORIGDSTADDR, - &readLocalAddr, sizeof(readLocalAddr)) < 0) { - readLocalAddr = 0; - } - - if (readLocalAddr) { - iov.iov_base = buffer + pos; - iov.iov_len = (size_t) (limit - pos); - msg.msg_name = (struct sockaddr*) &addr; - msg.msg_namelen = addrlen; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - msg.msg_control = cntrlbuf; - msg.msg_controllen = sizeof(cntrlbuf); - } -#endif do { -#ifdef IP_RECVORIGDSTADDR - if (readLocalAddr) { - res = recvmsg(fd, &msg, 0); - } else { -#endif - res = recvfrom(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr*) &addr, &addrlen); -#ifdef IP_RECVORIGDSTADDR - } -#endif + res = recvfrom(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr*) &addr, &addrlen); // Keep on reading if we was interrupted } while (res == -1 && ((err = errno) == EINTR)); @@ -396,24 +362,7 @@ static jobject _recvFrom(JNIEnv* env, jint fd, void* buffer, jint pos, jint limi return NULL; } -#ifdef IP_RECVORIGDSTADDR -#if !defined(SOL_IP) && defined(IPPROTO_IP) -#define SOL_IP IPPROTO_IP -#endif /* !SOL_IP && IPPROTO_IP */ - if (readLocalAddr) { - for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - if (cmsg->cmsg_level == SOL_IP && cmsg->cmsg_type == IP_RECVORIGDSTADDR) { - memcpy (&daddr, CMSG_DATA(cmsg), sizeof (struct sockaddr_storage)); - local = createDatagramSocketAddress(env, &daddr, res, NULL); - if (local == NULL) { - return NULL; - } - break; - } - } - } -#endif - return createDatagramSocketAddress(env, &addr, res, local); + return createDatagramSocketAddress(env, &addr, res, NULL); } void netty_unix_socket_getOptionHandleError(JNIEnv* env, int err) {