Update DatagramPacket.recipient() to return the actual destination IP (#7879)

Motivation:

DatagramPacket.recipient() doesn't return the actual destination IP, but the IP the app is bound to.

Modification:

- IP_RECVORIGDSTADDR option is enabled for UDP sockets, which allows retrieval of ancillary information containing the original recipient.
- _recvFrom(...) function from transport-native-unix-common/src/main/c/netty_unix_socket.c is modified such that if IP_RECVORIGDSTADDR is set, recvmsg is used instead of recvfrom; enabling the retrieval of the original recipient.
- DatagramSocketAddress also contains a 'local' address, representing the recipient.
- EpollDatagramChannel is updated to return the retrieved recipient address instead of the address the channel is bound to.

Result:

Fixes #4950.
This commit is contained in:
Devrim Şahin 2018-04-26 09:00:36 +03:00 committed by Norman Maurer
parent f4d7e8de14
commit b818852cdb
9 changed files with 123 additions and 9 deletions

View File

@ -277,6 +277,10 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
for (byte b : bytes) { for (byte b : bytes) {
assertEquals(b, buf.readByte()); assertEquals(b, buf.readByte());
} }
// Test that the channel's localAddress is equal to the message's recipient
assertEquals(ctx.channel().localAddress(), msg.recipient());
latch.countDown(); latch.countDown();
} }
}); });

View File

@ -107,6 +107,10 @@ static void netty_epoll_linuxsocket_setIpTransparent(JNIEnv* env, jclass clazz,
netty_unix_socket_setOption(env, fd, SOL_IP, IP_TRANSPARENT, &optval, sizeof(optval)); netty_unix_socket_setOption(env, fd, SOL_IP, IP_TRANSPARENT, &optval, sizeof(optval));
} }
static void netty_epoll_linuxsocket_setIpRecvOrigDestAddr(JNIEnv* env, jclass clazz, jint fd, jint optval) {
netty_unix_socket_setOption(env, fd, IPPROTO_IP, IP_RECVORIGDSTADDR, &optval, sizeof(optval));
}
static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jbyteArray key) { static void netty_epoll_linuxsocket_setTcpMd5Sig(JNIEnv* env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jbyteArray key) {
struct sockaddr_storage addr; struct sockaddr_storage addr;
socklen_t addrSize; socklen_t addrSize;
@ -193,6 +197,14 @@ static jint netty_epoll_linuxsocket_isIpTransparent(JNIEnv* env, jclass clazz, j
return optval; return optval;
} }
static jint netty_epoll_linuxsocket_isIpRecvOrigDestAddr(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_RECVORIGDSTADDR, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
static void netty_epoll_linuxsocket_getTcpInfo(JNIEnv* env, jclass clazz, jint fd, jlongArray array) { static void netty_epoll_linuxsocket_getTcpInfo(JNIEnv* env, jclass clazz, jint fd, jlongArray array) {
struct tcp_info tcp_info; struct tcp_info tcp_info;
if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_INFO, &tcp_info, sizeof(tcp_info)) == -1) { if (netty_unix_socket_getOption(env, fd, IPPROTO_TCP, TCP_INFO, &tcp_info, sizeof(tcp_info)) == -1) {
@ -345,12 +357,14 @@ static const JNINativeMethod fixed_method_table[] = {
{ "setTcpUserTimeout", "(II)V", (void *) netty_epoll_linuxsocket_setTcpUserTimeout }, { "setTcpUserTimeout", "(II)V", (void *) netty_epoll_linuxsocket_setTcpUserTimeout },
{ "setIpFreeBind", "(II)V", (void *) netty_epoll_linuxsocket_setIpFreeBind }, { "setIpFreeBind", "(II)V", (void *) netty_epoll_linuxsocket_setIpFreeBind },
{ "setIpTransparent", "(II)V", (void *) netty_epoll_linuxsocket_setIpTransparent }, { "setIpTransparent", "(II)V", (void *) netty_epoll_linuxsocket_setIpTransparent },
{ "setIpRecvOrigDestAddr", "(II)V", (void *) netty_epoll_linuxsocket_setIpRecvOrigDestAddr },
{ "getTcpKeepIdle", "(I)I", (void *) netty_epoll_linuxsocket_getTcpKeepIdle }, { "getTcpKeepIdle", "(I)I", (void *) netty_epoll_linuxsocket_getTcpKeepIdle },
{ "getTcpKeepIntvl", "(I)I", (void *) netty_epoll_linuxsocket_getTcpKeepIntvl }, { "getTcpKeepIntvl", "(I)I", (void *) netty_epoll_linuxsocket_getTcpKeepIntvl },
{ "getTcpKeepCnt", "(I)I", (void *) netty_epoll_linuxsocket_getTcpKeepCnt }, { "getTcpKeepCnt", "(I)I", (void *) netty_epoll_linuxsocket_getTcpKeepCnt },
{ "getTcpUserTimeout", "(I)I", (void *) netty_epoll_linuxsocket_getTcpUserTimeout }, { "getTcpUserTimeout", "(I)I", (void *) netty_epoll_linuxsocket_getTcpUserTimeout },
{ "isIpFreeBind", "(I)I", (void *) netty_epoll_linuxsocket_isIpFreeBind }, { "isIpFreeBind", "(I)I", (void *) netty_epoll_linuxsocket_isIpFreeBind },
{ "isIpTransparent", "(I)I", (void *) netty_epoll_linuxsocket_isIpTransparent }, { "isIpTransparent", "(I)I", (void *) netty_epoll_linuxsocket_isIpTransparent },
{ "isIpRecvOrigDestAddr", "(I)I", (void *) netty_epoll_linuxsocket_isIpRecvOrigDestAddr },
{ "getTcpInfo", "(I[J)V", (void *) netty_epoll_linuxsocket_getTcpInfo }, { "getTcpInfo", "(I[J)V", (void *) netty_epoll_linuxsocket_getTcpInfo },
{ "setTcpMd5Sig", "(I[BI[B)V", (void *) netty_epoll_linuxsocket_setTcpMd5Sig } { "setTcpMd5Sig", "(I[BI[B)V", (void *) netty_epoll_linuxsocket_setTcpMd5Sig }
// "sendFile" has a dynamic signature // "sendFile" has a dynamic signature

View File

@ -31,6 +31,7 @@ public final class EpollChannelOption<T> extends UnixChannelOption<T> {
valueOf(EpollChannelOption.class, "TCP_USER_TIMEOUT"); valueOf(EpollChannelOption.class, "TCP_USER_TIMEOUT");
public static final ChannelOption<Boolean> IP_FREEBIND = valueOf("IP_FREEBIND"); public static final ChannelOption<Boolean> IP_FREEBIND = valueOf("IP_FREEBIND");
public static final ChannelOption<Boolean> IP_TRANSPARENT = valueOf("IP_TRANSPARENT"); public static final ChannelOption<Boolean> IP_TRANSPARENT = valueOf("IP_TRANSPARENT");
public static final ChannelOption<Boolean> IP_RECVORIGDSTADDR = valueOf("IP_RECVORIGDSTADDR");
public static final ChannelOption<Integer> TCP_FASTOPEN = valueOf(EpollChannelOption.class, "TCP_FASTOPEN"); public static final ChannelOption<Integer> TCP_FASTOPEN = valueOf(EpollChannelOption.class, "TCP_FASTOPEN");
public static final ChannelOption<Boolean> TCP_FASTOPEN_CONNECT = public static final ChannelOption<Boolean> TCP_FASTOPEN_CONNECT =
valueOf(EpollChannelOption.class, "TCP_FASTOPEN_CONNECT"); valueOf(EpollChannelOption.class, "TCP_FASTOPEN_CONNECT");

View File

@ -470,13 +470,18 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
break; break;
} }
InetSocketAddress localAddress = remoteAddress.localAddress();
if (localAddress == null) {
localAddress = (InetSocketAddress) localAddress();
}
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
allocHandle.lastBytesRead(remoteAddress.receivedAmount()); allocHandle.lastBytesRead(remoteAddress.receivedAmount());
data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()); data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
readPending = false; readPending = false;
pipeline.fireChannelRead( pipeline.fireChannelRead(
new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress)); new DatagramPacket(data, localAddress, remoteAddress));
data = null; data = null;
} while (allocHandle.continueReading()); } while (allocHandle.continueReading());

View File

@ -49,7 +49,8 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED, ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED,
ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL, ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL,
ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION,
EpollChannelOption.SO_REUSEPORT, EpollChannelOption.IP_TRANSPARENT); EpollChannelOption.SO_REUSEPORT, EpollChannelOption.IP_TRANSPARENT,
EpollChannelOption.IP_RECVORIGDSTADDR);
} }
@SuppressWarnings({ "unchecked", "deprecation" }) @SuppressWarnings({ "unchecked", "deprecation" })
@ -91,6 +92,9 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
if (option == EpollChannelOption.IP_TRANSPARENT) { if (option == EpollChannelOption.IP_TRANSPARENT) {
return (T) Boolean.valueOf(isIpTransparent()); return (T) Boolean.valueOf(isIpTransparent());
} }
if (option == EpollChannelOption.IP_RECVORIGDSTADDR) {
return (T) Boolean.valueOf(isIpRecvOrigDestAddr());
}
return super.getOption(option); return super.getOption(option);
} }
@ -123,6 +127,8 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
setReusePort((Boolean) value); setReusePort((Boolean) value);
} else if (option == EpollChannelOption.IP_TRANSPARENT) { } else if (option == EpollChannelOption.IP_TRANSPARENT) {
setIpTransparent((Boolean) value); setIpTransparent((Boolean) value);
} else if (option == EpollChannelOption.IP_RECVORIGDSTADDR) {
setIpRecvOrigDestAddr((Boolean) value);
} else { } else {
return super.setOption(option, value); return super.setOption(option, value);
} }
@ -403,4 +409,29 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
} }
} }
/**
* Returns {@code true} if <a href="http://man7.org/linux/man-pages/man7/ip.7.html">IP_RECVORIGDSTADDR</a> is
* enabled, {@code false} otherwise.
*/
public boolean isIpRecvOrigDestAddr() {
try {
return datagramChannel.socket.isIpRecvOrigDestAddr();
} catch (IOException e) {
throw new ChannelException(e);
}
}
/**
* If {@code true} is used <a href="http://man7.org/linux/man-pages/man7/ip.7.html">IP_RECVORIGDSTADDR</a> is
* enabled, {@code false} for disable it. Default is disabled.
*/
public EpollDatagramChannelConfig setIpRecvOrigDestAddr(boolean ipTransparent) {
try {
datagramChannel.socket.setIpRecvOrigDestAddr(ipTransparent);
return this;
} catch (IOException e) {
throw new ChannelException(e);
}
}
} }

View File

@ -99,6 +99,10 @@ final class LinuxSocket extends Socket {
setIpTransparent(intValue(), enabled ? 1 : 0); setIpTransparent(intValue(), enabled ? 1 : 0);
} }
void setIpRecvOrigDestAddr(boolean enabled) throws IOException {
setIpRecvOrigDestAddr(intValue(), enabled ? 1 : 0);
}
void getTcpInfo(EpollTcpInfo info) throws IOException { void getTcpInfo(EpollTcpInfo info) throws IOException {
getTcpInfo(intValue(), info.info); getTcpInfo(intValue(), info.info);
} }
@ -148,6 +152,10 @@ final class LinuxSocket extends Socket {
return isIpTransparent(intValue()) != 0; return isIpTransparent(intValue()) != 0;
} }
boolean isIpRecvOrigDestAddr() throws IOException {
return isIpRecvOrigDestAddr(intValue()) != 0;
}
PeerCredentials getPeerCredentials() throws IOException { PeerCredentials getPeerCredentials() throws IOException {
return getPeerCredentials(intValue()); return getPeerCredentials(intValue());
} }
@ -189,6 +197,7 @@ final class LinuxSocket extends Socket {
private static native int getTcpUserTimeout(int fd) throws IOException; private static native int getTcpUserTimeout(int fd) throws IOException;
private static native int isIpFreeBind(int fd) throws IOException; private static native int isIpFreeBind(int fd) throws IOException;
private static native int isIpTransparent(int fd) throws IOException; private static native int isIpTransparent(int fd) throws IOException;
private static native int isIpRecvOrigDestAddr(int fd) throws IOException;
private static native void getTcpInfo(int fd, long[] array) throws IOException; private static native void getTcpInfo(int fd, long[] array) throws IOException;
private static native PeerCredentials getPeerCredentials(int fd) throws IOException; private static native PeerCredentials getPeerCredentials(int fd) throws IOException;
private static native int isTcpFastOpenConnect(int fd) throws IOException; private static native int isTcpFastOpenConnect(int fd) throws IOException;
@ -205,5 +214,6 @@ final class LinuxSocket extends Socket {
private static native void setTcpUserTimeout(int fd, int milliseconds)throws IOException; private static native void setTcpUserTimeout(int fd, int milliseconds)throws IOException;
private static native void setIpFreeBind(int fd, int freeBind) throws IOException; private static native void setIpFreeBind(int fd, int freeBind) throws IOException;
private static native void setIpTransparent(int fd, int transparent) throws IOException; private static native void setIpTransparent(int fd, int transparent) throws IOException;
private static native void setIpRecvOrigDestAddr(int fd, int transparent) throws IOException;
private static native void setTcpMd5Sig(int fd, byte[] address, int scopeId, byte[] key) throws IOException; private static native void setTcpMd5Sig(int fd, byte[] address, int scopeId, byte[] key) throws IOException;
} }

View File

@ -26,4 +26,11 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() { protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagram(); return EpollSocketTestPermutation.INSTANCE.datagram();
} }
public void testSimpleSendWithConnect(Bootstrap sb, Bootstrap cb) throws Throwable {
// Run this test with IP_RECVORIGDSTADDR option enabled
sb.option(EpollChannelOption.IP_RECVORIGDSTADDR, true);
super.testSimpleSendWithConnect(sb, cb);
sb.option(EpollChannelOption.IP_RECVORIGDSTADDR, false);
}
} }

View File

@ -68,7 +68,7 @@ static int nettyNonBlockingSocket(int domain, int type, int protocol) {
#endif #endif
} }
static jobject createDatagramSocketAddress(JNIEnv* env, const struct sockaddr_storage* addr, int len) { static jobject createDatagramSocketAddress(JNIEnv* env, const struct sockaddr_storage* addr, int len, jobject local) {
char ipstr[INET6_ADDRSTRLEN]; char ipstr[INET6_ADDRSTRLEN];
int port; int port;
jstring ipString; jstring ipString;
@ -93,7 +93,7 @@ static jobject createDatagramSocketAddress(JNIEnv* env, const struct sockaddr_st
ipString = (*env)->NewStringUTF(env, ipstr); ipString = (*env)->NewStringUTF(env, ipstr);
} }
} }
jobject socketAddr = (*env)->NewObject(env, datagramSocketAddressClass, datagramSocketAddrMethodId, ipString, port, len); jobject socketAddr = (*env)->NewObject(env, datagramSocketAddressClass, datagramSocketAddrMethodId, ipString, port, len, local);
return socketAddr; return socketAddr;
} }
@ -289,12 +289,39 @@ static jint _sendTo(JNIEnv* env, jint fd, void* buffer, jint pos, jint limit, jb
static jobject _recvFrom(JNIEnv* env, jint fd, void* buffer, jint pos, jint limit) { static jobject _recvFrom(JNIEnv* env, jint fd, void* buffer, jint pos, jint limit) {
struct sockaddr_storage addr; struct sockaddr_storage addr;
struct sockaddr_storage daddr;
socklen_t addrlen = sizeof(addr); socklen_t addrlen = sizeof(addr);
char cntrlbuf[64];
struct iovec iov;
struct msghdr msg;
ssize_t res; ssize_t res;
int err; int err;
struct cmsghdr* cmsg;
jobject local = NULL;
int readLocalAddr;
if (netty_unix_socket_getOption(env, fd, IPPROTO_IP, IP_RECVORIGDSTADDR,
&readLocalAddr, sizeof(readLocalAddr)) < 0) {
readLocalAddr = 0;
}
if (readLocalAddr) {
iov.iov_base = buffer + pos;
iov.iov_len = (size_t) (limit - pos);
msg.msg_name = (struct sockaddr*) &addr;
msg.msg_namelen = addrlen;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = cntrlbuf;
msg.msg_controllen = sizeof(cntrlbuf);
}
do { do {
res = recvfrom(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr*) &addr, &addrlen); if (readLocalAddr) {
res = recvmsg(fd, &msg, 0);
} else {
res = recvfrom(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr*) &addr, &addrlen);
}
// Keep on reading if we was interrupted // Keep on reading if we was interrupted
} while (res == -1 && ((err = errno) == EINTR)); } while (res == -1 && ((err = errno) == EINTR));
@ -315,7 +342,16 @@ static jobject _recvFrom(JNIEnv* env, jint fd, void* buffer, jint pos, jint limi
return NULL; return NULL;
} }
return createDatagramSocketAddress(env, &addr, res); if (readLocalAddr) {
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_IP && cmsg->cmsg_type == IP_RECVORIGDSTADDR) {
memcpy (&daddr, CMSG_DATA(cmsg), sizeof (struct sockaddr_storage));
local = createDatagramSocketAddress(env, &daddr, res, NULL);
break;
}
}
}
return createDatagramSocketAddress(env, &addr, res, local);
} }
void netty_unix_socket_getOptionHandleError(JNIEnv* env, int err) { void netty_unix_socket_getOptionHandleError(JNIEnv* env, int err) {
@ -977,9 +1013,9 @@ jint netty_unix_socket_JNI_OnLoad(JNIEnv* env, const char* packagePrefix) {
netty_unix_errors_throwOutOfMemoryError(env); netty_unix_errors_throwOutOfMemoryError(env);
return JNI_ERR; return JNI_ERR;
} }
datagramSocketAddrMethodId = (*env)->GetMethodID(env, datagramSocketAddressClass, "<init>", "(Ljava/lang/String;II)V"); datagramSocketAddrMethodId = (*env)->GetMethodID(env, datagramSocketAddressClass, "<init>", "(Ljava/lang/String;IILio/netty/channel/unix/DatagramSocketAddress;)V");
if (datagramSocketAddrMethodId == NULL) { if (datagramSocketAddrMethodId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get method ID: DatagramSocketAddress.<init>(String, int, int)"); netty_unix_errors_throwRuntimeException(env, "failed to get method ID: DatagramSocketAddress.<init>(String, int, int, DatagramSocketAddress)");
return JNI_ERR; return JNI_ERR;
} }
jclass localInetSocketAddressClass = (*env)->FindClass(env, "java/net/InetSocketAddress"); jclass localInetSocketAddressClass = (*env)->FindClass(env, "java/net/InetSocketAddress");

View File

@ -28,10 +28,16 @@ public final class DatagramSocketAddress extends InetSocketAddress {
// holds the amount of received bytes // holds the amount of received bytes
private final int receivedAmount; private final int receivedAmount;
private final DatagramSocketAddress localAddress;
DatagramSocketAddress(String addr, int port, int receivedAmount) { DatagramSocketAddress(String addr, int port, int receivedAmount, DatagramSocketAddress local) {
super(addr, port); super(addr, port);
this.receivedAmount = receivedAmount; this.receivedAmount = receivedAmount;
localAddress = local;
}
public DatagramSocketAddress localAddress() {
return localAddress;
} }
public int receivedAmount() { public int receivedAmount() {