Added UDP multicast (with caveats: no ipv6, getInterface, getNetworkI… (#9006)

…nterface, block or loopback-mode-disabled operations).


Motivation:

Provide epoll/native multicast to support high load multicast users (we are using it for a high load telecomm app at my day job).

Modification:

Added support for (ipv4 only) source specific and any source multicast for epoll transport. Some caveats (beyond no ipv6 support initially - there’s a bit of work to add in join and leave group specifically around SSM, as ipv6 uses different data structures for this): no support for disabling loop back mode, retrieval of interface and block operation, all of which tend to be less frequently used.

Result:

Provides epoll transport multicast for IPv4 for common use cases. Understand if you’d prefer to hold off until ipv6 is included but not sure when I’ll be able to get to that.
This commit is contained in:
Steve Buzzard 2019-04-08 14:13:39 -04:00 committed by Norman Maurer
parent 2935944426
commit 5e8fdf06bc
5 changed files with 351 additions and 10 deletions

View File

@ -64,6 +64,32 @@ static jfieldID fdFieldId = NULL;
static jfieldID fileDescriptorFieldId = NULL;
// JNI Registered Methods Begin
static void netty_epoll_linuxsocket_setTimeToLive(JNIEnv* env, jclass clazz, jint fd, jint optval) {
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval));
}
static void netty_epoll_linuxsocket_setInterface(JNIEnv* env, jclass clazz, jint fd, jbyteArray interfaceAddress, jint scopeId) {
struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize;
struct sockaddr_in* interfaceIpAddr;
struct sockaddr_in6* interfaceIp6Addr;
if (netty_unix_socket_initSockaddr(env, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) {
return;
}
switch (interfaceAddr.ss_family) {
case AF_INET:
interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr;
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_MULTICAST_IF, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr));
break;
case AF_INET6:
interfaceIp6Addr = (struct sockaddr_in6*) &interfaceAddr;
netty_unix_socket_setOption(env, fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &interfaceIp6Addr->sin6_addr, sizeof(interfaceIp6Addr->sin6_addr));
break;
}
}
static void netty_epoll_linuxsocket_setTcpCork(JNIEnv* env, jclass clazz, jint fd, jint optval) {
netty_unix_socket_setOption(env, fd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
}
@ -120,6 +146,148 @@ static void netty_epoll_linuxsocket_setSoBusyPoll(JNIEnv* env, jclass clazz, jin
netty_unix_socket_setOption(env, fd, SOL_SOCKET, SO_BUSY_POLL, &optval, sizeof(optval));
}
static void netty_epoll_linuxsocket_joinGroup(JNIEnv* env, jclass clazz, jint fd, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId) {
struct sockaddr_storage groupAddr;
socklen_t groupAddrSize;
struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize;
struct sockaddr_in* groupIpAddr;
struct sockaddr_in* interfaceIpAddr;
struct ip_mreq mreq;
if (netty_unix_socket_initSockaddr(env, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) {
return;
}
if (netty_unix_socket_initSockaddr(env, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) {
return;
}
switch (groupAddr.ss_family) {
case AF_INET:
groupIpAddr = (struct sockaddr_in*) &groupAddr;
interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr;
memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr));
memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr));
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
break;
case AF_INET6:
break;
}
}
static void netty_epoll_linuxsocket_joinSsmGroup(JNIEnv* env, jclass clazz, jint fd, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jbyteArray sourceAddress) {
struct sockaddr_storage groupAddr;
socklen_t groupAddrSize;
struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize;
struct sockaddr_storage sourceAddr;
socklen_t sourceAddrSize;
struct sockaddr_in* groupIpAddr;
struct sockaddr_in* interfaceIpAddr;
struct sockaddr_in* sourceIpAddr;
struct ip_mreq_source mreq;
if (netty_unix_socket_initSockaddr(env, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) {
return;
}
if (netty_unix_socket_initSockaddr(env, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) {
return;
}
if (netty_unix_socket_initSockaddr(env, sourceAddress, scopeId, 0, &sourceAddr, &sourceAddrSize) == -1) {
return;
}
switch (groupAddr.ss_family) {
case AF_INET:
groupIpAddr = (struct sockaddr_in*) &groupAddr;
interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr;
sourceIpAddr = (struct sockaddr_in*) &sourceAddr;
memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr));
memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr));
memcpy(&mreq.imr_sourceaddr, &sourceIpAddr->sin_addr, sizeof(sourceIpAddr->sin_addr));
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq));
break;
case AF_INET6:
break;
}
}
static void netty_epoll_linuxsocket_leaveGroup(JNIEnv* env, jclass clazz, jint fd, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId) {
struct sockaddr_storage groupAddr;
socklen_t groupAddrSize;
struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize;
struct sockaddr_in* groupIpAddr;
struct sockaddr_in* interfaceIpAddr;
struct ip_mreq mreq;
if (netty_unix_socket_initSockaddr(env, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) {
return;
}
if (netty_unix_socket_initSockaddr(env, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) {
return;
}
switch (groupAddr.ss_family) {
case AF_INET:
groupIpAddr = (struct sockaddr_in*) &groupAddr;
interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr;
memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr));
memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr));
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq));
break;
case AF_INET6:
break;
}
}
static void netty_epoll_linuxsocket_leaveSsmGroup(JNIEnv* env, jclass clazz, jint fd, jbyteArray groupAddress, jbyteArray interfaceAddress, jint scopeId, jbyteArray sourceAddress) {
struct sockaddr_storage groupAddr;
socklen_t groupAddrSize;
struct sockaddr_storage interfaceAddr;
socklen_t interfaceAddrSize;
struct sockaddr_storage sourceAddr;
socklen_t sourceAddrSize;
struct sockaddr_in* groupIpAddr;
struct sockaddr_in* interfaceIpAddr;
struct sockaddr_in* sourceIpAddr;
struct ip_mreq_source mreq;
if (netty_unix_socket_initSockaddr(env, groupAddress, scopeId, 0, &groupAddr, &groupAddrSize) == -1) {
return;
}
if (netty_unix_socket_initSockaddr(env, interfaceAddress, scopeId, 0, &interfaceAddr, &interfaceAddrSize) == -1) {
return;
}
if (netty_unix_socket_initSockaddr(env, sourceAddress, scopeId, 0, &sourceAddr, &sourceAddrSize) == -1) {
return;
}
switch (groupAddr.ss_family) {
case AF_INET:
groupIpAddr = (struct sockaddr_in*) &groupAddr;
interfaceIpAddr = (struct sockaddr_in*) &interfaceAddr;
sourceIpAddr = (struct sockaddr_in*) &sourceAddr;
memcpy(&mreq.imr_multiaddr, &groupIpAddr->sin_addr, sizeof(groupIpAddr->sin_addr));
memcpy(&mreq.imr_interface, &interfaceIpAddr->sin_addr, sizeof(interfaceIpAddr->sin_addr));
memcpy(&mreq.imr_sourceaddr, &sourceIpAddr->sin_addr, sizeof(sourceIpAddr->sin_addr));
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_DROP_SOURCE_MEMBERSHIP, &mreq, sizeof(mreq));
break;
case AF_INET6:
break;
}
}
static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jbyteArray key) {
struct sockaddr_storage addr;
socklen_t addrSize;
@ -158,6 +326,14 @@ static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint
}
}
static jint netty_epoll_linuxsocket_getTimeToLive(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_TTL, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
static jint netty_epoll_linuxsocket_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_KEEPIDLE, &optval, sizeof(optval)) == -1) {
@ -357,6 +533,9 @@ static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd
// JNI Method Registration Table Begin
static const JNINativeMethod fixed_method_table[] = {
{ "setTimeToLive", "(II)V", (void *) netty_epoll_linuxsocket_setTimeToLive },
{ "getTimeToLive", "(I)I", (void *) netty_epoll_linuxsocket_getTimeToLive },
{ "setInterface", "(I[BI)V", (void *) netty_epoll_linuxsocket_setInterface },
{ "setTcpCork", "(II)V", (void *) netty_epoll_linuxsocket_setTcpCork },
{ "setSoBusyPoll", "(II)V", (void *) netty_epoll_linuxsocket_setSoBusyPoll },
{ "setTcpQuickAck", "(II)V", (void *) netty_epoll_linuxsocket_setTcpQuickAck },
@ -385,7 +564,11 @@ static const JNINativeMethod fixed_method_table[] = {
{ "isIpTransparent", "(I)I", (void *) netty_epoll_linuxsocket_isIpTransparent },
{ "isIpRecvOrigDestAddr", "(I)I", (void *) netty_epoll_linuxsocket_isIpRecvOrigDestAddr },
{ "getTcpInfo", "(I[J)V", (void *) netty_epoll_linuxsocket_getTcpInfo },
{ "setTcpMd5Sig", "(I[BI[B)V", (void *) netty_epoll_linuxsocket_setTcpMd5Sig }
{ "setTcpMd5Sig", "(I[BI[B)V", (void *) netty_epoll_linuxsocket_setTcpMd5Sig },
{ "joinGroup", "(I[B[BI)V", (void *) netty_epoll_linuxsocket_joinGroup },
{ "joinSsmGroup", "(I[B[BI[B)V", (void *) netty_epoll_linuxsocket_joinSsmGroup },
{ "leaveGroup", "(I[B[BI)V", (void *) netty_epoll_linuxsocket_leaveGroup },
{ "leaveSsmGroup", "(I[B[BI[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup }
// "sendFile" has a dynamic signature
};

View File

@ -36,6 +36,7 @@ import io.netty.util.internal.StringUtil;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.PortUnreachableException;
@ -113,7 +114,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return joinGroup(
multicastAddress,
NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
} catch (SocketException e) {
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
@ -145,7 +146,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
requireNonNull(multicastAddress, "multicastAddress");
requireNonNull(networkInterface, "networkInterface");
promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
if (multicastAddress instanceof Inet6Address) {
promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
} else {
try {
socket.joinGroup(multicastAddress, networkInterface, source);
promise.setSuccess();
} catch (IOException e) {
promise.setFailure(e);
}
}
return promise;
}
@ -159,7 +169,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
try {
return leaveGroup(
multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
} catch (SocketException e) {
} catch (IOException e) {
promise.setFailure(e);
}
return promise;
@ -191,8 +201,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
requireNonNull(multicastAddress, "multicastAddress");
requireNonNull(networkInterface, "networkInterface");
promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
if (multicastAddress instanceof Inet6Address) {
promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
} else {
try {
socket.leaveGroup(multicastAddress, networkInterface, source);
promise.setSuccess();
} catch (IOException e) {
promise.setFailure(e);
}
}
return promise;
}

View File

@ -326,12 +326,21 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
@Override
public int getTimeToLive() {
return -1;
try {
return ((EpollDatagramChannel) channel).socket.getTimeToLive();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public EpollDatagramChannelConfig setTimeToLive(int ttl) {
throw new UnsupportedOperationException("Multicast not supported");
try {
((EpollDatagramChannel) channel).socket.setTimeToLive(ttl);
return this;
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
@ -341,7 +350,12 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
@Override
public EpollDatagramChannelConfig setInterface(InetAddress interfaceAddress) {
throw new UnsupportedOperationException("Multicast not supported");
try {
((EpollDatagramChannel) channel).socket.setInterface(interfaceAddress);
return this;
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
@ -351,7 +365,12 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
@Override
public EpollDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) {
throw new UnsupportedOperationException("Multicast not supported");
try {
((EpollDatagramChannel) channel).socket.setNetworkInterface(networkInterface);
return this;
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.epoll;
import io.netty.channel.ChannelException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.channel.unix.NativeInetAddress;
@ -24,7 +25,11 @@ import io.netty.util.internal.ThrowableUtil;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Inet6Address;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.Enumeration;
import static io.netty.channel.unix.Errors.ERRNO_EPIPE_NEGATIVE;
import static io.netty.channel.unix.Errors.ioResult;
@ -34,6 +39,8 @@ import static io.netty.channel.unix.Errors.newConnectionResetException;
* A socket which provides access Linux native methods.
*/
final class LinuxSocket extends Socket {
private static final InetAddress INET_ANY = unsafeInetAddrByName("0.0.0.0");
private static final InetAddress INET6_ANY = unsafeInetAddrByName("::");
private static final long MAX_UINT32_T = 0xFFFFFFFFL;
private static final NativeIoException SENDFILE_CONNECTION_RESET_EXCEPTION =
newConnectionResetException("syscall:sendfile(...)", ERRNO_EPIPE_NEGATIVE);
@ -44,6 +51,49 @@ final class LinuxSocket extends Socket {
super(fd);
}
void setTimeToLive(int ttl) throws IOException {
setTimeToLive(intValue(), ttl);
}
void setInterface(InetAddress address) throws IOException {
final NativeInetAddress a = NativeInetAddress.newInstance(address);
setInterface(intValue(), a.address(), a.scopeId());
}
void setNetworkInterface(NetworkInterface netInterface) throws IOException {
final NativeInetAddress i = NativeInetAddress.newInstance(deriveInetAddress(netInterface, false));
if (i.equals(INET_ANY)) {
final NativeInetAddress i6 = NativeInetAddress.newInstance(deriveInetAddress(netInterface, true));
setInterface(intValue(), i6.address(), i6.scopeId());
} else {
setInterface(intValue(), i.address(), i.scopeId());
}
}
void joinGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException {
final NativeInetAddress g = NativeInetAddress.newInstance(group);
final boolean isIpv6 = group instanceof Inet6Address;
final NativeInetAddress i = NativeInetAddress.newInstance(deriveInetAddress(netInterface, isIpv6));
if (source != null) {
final NativeInetAddress s = NativeInetAddress.newInstance(source);
joinSsmGroup(intValue(), g.address(), i.address(), g.scopeId(), s.address());
} else {
joinGroup(intValue(), g.address(), i.address(), g.scopeId());
}
}
void leaveGroup(InetAddress group, NetworkInterface netInterface, InetAddress source) throws IOException {
final NativeInetAddress g = NativeInetAddress.newInstance(group);
final boolean isIpv6 = group instanceof Inet6Address;
final NativeInetAddress i = NativeInetAddress.newInstance(deriveInetAddress(netInterface, isIpv6));
if (source != null) {
final NativeInetAddress s = NativeInetAddress.newInstance(source);
leaveSsmGroup(intValue(), g.address(), i.address(), g.scopeId(), s.address());
} else {
leaveGroup(intValue(), g.address(), i.address(), g.scopeId());
}
}
void setTcpDeferAccept(int deferAccept) throws IOException {
setTcpDeferAccept(intValue(), deferAccept);
}
@ -107,6 +157,10 @@ final class LinuxSocket extends Socket {
setIpRecvOrigDestAddr(intValue(), enabled ? 1 : 0);
}
int getTimeToLive() throws IOException {
return getTimeToLive(intValue());
}
void getTcpInfo(EpollTcpInfo info) throws IOException {
getTcpInfo(intValue(), info.info);
}
@ -180,6 +234,21 @@ final class LinuxSocket extends Socket {
return ioResult("sendfile", (int) res, SENDFILE_CONNECTION_RESET_EXCEPTION, SENDFILE_CLOSED_CHANNEL_EXCEPTION);
}
private InetAddress deriveInetAddress(NetworkInterface netInterface, boolean ipv6) throws IOException {
final InetAddress ipAny = ipv6 ? INET6_ANY : INET_ANY;
if (netInterface != null) {
final Enumeration<InetAddress> ias = netInterface.getInetAddresses();
while (ias.hasMoreElements()) {
final InetAddress ia = ias.nextElement();
final boolean isV6 = ia instanceof Inet6Address;
if (isV6 == ipv6) {
return ia;
}
}
}
return ipAny;
}
public static LinuxSocket newSocketStream() {
return new LinuxSocket(newSocketStream0());
}
@ -192,6 +261,22 @@ final class LinuxSocket extends Socket {
return new LinuxSocket(newSocketDomain0());
}
private static InetAddress unsafeInetAddrByName(String inetName) {
try {
return InetAddress.getByName(inetName);
} catch (UnknownHostException uhe) {
throw new ChannelException(uhe);
}
}
private static native void joinGroup(int fd, byte[] group, byte[] interfaceAddress,
int scopeId) throws IOException;
private static native void joinSsmGroup(int fd, byte[] group, byte[] interfaceAddress,
int scopeId, byte[] source) throws IOException;
private static native void leaveGroup(int fd, byte[] group, byte[] interfaceAddress,
int scopeId) throws IOException;
private static native void leaveSsmGroup(int fd, byte[] group, byte[] interfaceAddress,
int scopeId, byte[] source) throws IOException;
private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset,
long offset, long length) throws IOException;
@ -204,6 +289,7 @@ final class LinuxSocket extends Socket {
private static native int getTcpKeepIntvl(int fd) throws IOException;
private static native int getTcpKeepCnt(int fd) throws IOException;
private static native int getTcpUserTimeout(int fd) throws IOException;
private static native int getTimeToLive(int fd) throws IOException;
private static native int isIpFreeBind(int fd) throws IOException;
private static native int isIpTransparent(int fd) throws IOException;
private static native int isIpRecvOrigDestAddr(int fd) throws IOException;
@ -226,4 +312,6 @@ final class LinuxSocket extends Socket {
private static native void setIpTransparent(int fd, int transparent) throws IOException;
private static native void setIpRecvOrigDestAddr(int fd, int transparent) throws IOException;
private static native void setTcpMd5Sig(int fd, byte[] address, int scopeId, byte[] key) throws IOException;
private static native void setInterface(int fd, byte[] interfaceAddress, int scopeId) throws IOException;
private static native void setTimeToLive(int fd, int ttl) throws IOException;
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramMulticastTest;
import java.util.List;
public class EpollDatagramMulticastTest extends DatagramMulticastTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagram();
}
public void testMulticast(Bootstrap sb, Bootstrap cb) throws Throwable {
super.testMulticast(sb, cb);
}
}