Correctly handle ipv6 mapped ipv4 addresses when using recvmmsg (#9541)

Motivation:

394a1b3485 introduced the possibility to use recvmmsg(...) but did not correctly handle ipv6 mapped ip4 addresses to make it consistent with other transports.

Modifications:

- Correctly handle ipv6 mapped ipv4 addresses by only copy over the relevant bytes
- Small improvement on how to detect ipv6 mapped ipv4 addresses by using memcmp and not byte by byte compare
- Adjust test to cover this bug

Result:

Correctly handle ipv6 mapped ipv4 addresses
This commit is contained in:
Norman Maurer 2019-09-06 13:54:29 +02:00
parent 0280bd2063
commit d57a5f5d4f
6 changed files with 60 additions and 18 deletions

View File

@ -30,6 +30,7 @@ import io.netty.channel.socket.DatagramPacket;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -157,14 +158,19 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
} }
}); });
final CountDownLatch latch = new CountDownLatch(count); final SocketAddress sender;
sc = setupServerChannel(sb, bytes, latch, false);
if (bindClient) { if (bindClient) {
cc = cb.bind(newSocketAddress()).sync().channel(); cc = cb.bind(newSocketAddress()).sync().channel();
sender = cc.localAddress();
} else { } else {
cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true); cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
cc = cb.register().sync().channel(); cc = cb.register().sync().channel();
sender = null;
} }
final CountDownLatch latch = new CountDownLatch(count);
sc = setupServerChannel(sb, bytes, sender, latch, false);
InetSocketAddress addr = (InetSocketAddress) sc.localAddress(); InetSocketAddress addr = (InetSocketAddress) sc.localAddress();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
switch (wrapType) { switch (wrapType) {
@ -231,8 +237,10 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
DatagramChannel cc = null; DatagramChannel cc = null;
try { try {
final CountDownLatch latch = new CountDownLatch(count); final CountDownLatch latch = new CountDownLatch(count);
sc = setupServerChannel(sb, bytes, latch, true); cc = (DatagramChannel) cb.bind(newSocketAddress()).sync().channel();
cc = (DatagramChannel) cb.connect(sc.localAddress()).sync().channel(); sc = setupServerChannel(sb, bytes, cc.localAddress(), latch, true);
cc.connect(sc.localAddress()).syncUninterruptibly();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
switch (wrapType) { switch (wrapType) {
@ -275,7 +283,8 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final CountDownLatch latch, final boolean echo) private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final SocketAddress sender,
final CountDownLatch latch, final boolean echo)
throws Throwable { throws Throwable {
sb.handler(new ChannelInitializer<Channel>() { sb.handler(new ChannelInitializer<Channel>() {
@Override @Override
@ -283,6 +292,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() { ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
if (sender == null) {
assertNotNull(msg.sender());
} else {
assertEquals(sender, msg.sender());
}
ByteBuf buf = msg.content(); ByteBuf buf = msg.content();
assertEquals(bytes.length, buf.readableBytes()); assertEquals(bytes.length, buf.readableBytes());
for (int i = 0; i < bytes.length; i++) { for (int i = 0; i < bytes.length; i++) {

View File

@ -67,6 +67,7 @@ struct mmsghdr {
// Those are initialized in the init(...) method and cached for performance reasons // Those are initialized in the init(...) method and cached for performance reasons
static jfieldID packetAddrFieldId = NULL; static jfieldID packetAddrFieldId = NULL;
static jfieldID packetAddrLenFieldId = NULL;
static jfieldID packetScopeIdFieldId = NULL; static jfieldID packetScopeIdFieldId = NULL;
static jfieldID packetPortFieldId = NULL; static jfieldID packetPortFieldId = NULL;
static jfieldID packetMemoryAddressFieldId = NULL; static jfieldID packetMemoryAddressFieldId = NULL;
@ -362,12 +363,20 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr; struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr;
(*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr); (*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr);
(*env)->SetIntField(env, packet, packetAddrLenFieldId, 4);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, 0); (*env)->SetIntField(env, packet, packetScopeIdFieldId, 0);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port)); (*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port));
} else { } else {
int addrLen = netty_unix_socket_ipAddressLength(addr);
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr; struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr;
if (addrLen == 4) {
// IPV4 mapped IPV6 address
(*env)->SetByteArrayRegion(env, address, 12, 4, (jbyte*) &ip6addr->sin6_addr.s6_addr);
} else {
(*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr); (*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr);
}
(*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id); (*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port)); (*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port));
} }
@ -605,6 +614,11 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: NativeDatagramPacket.addr"); netty_unix_errors_throwRuntimeException(env, "failed to get field ID: NativeDatagramPacket.addr");
goto error; goto error;
} }
packetAddrLenFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "addrLen", "I");
if (packetAddrLenFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: NativeDatagramPacket.addrLen");
goto error;
}
packetScopeIdFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "scopeId", "I"); packetScopeIdFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "scopeId", "I");
if (packetScopeIdFieldId == NULL) { if (packetScopeIdFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: NativeDatagramPacket.scopeId"); netty_unix_errors_throwRuntimeException(env, "failed to get field ID: NativeDatagramPacket.scopeId");
@ -649,6 +663,7 @@ error:
netty_epoll_linuxsocket_JNI_OnUnLoad(env); netty_epoll_linuxsocket_JNI_OnUnLoad(env);
} }
packetAddrFieldId = NULL; packetAddrFieldId = NULL;
packetAddrLenFieldId = NULL;
packetScopeIdFieldId = NULL; packetScopeIdFieldId = NULL;
packetPortFieldId = NULL; packetPortFieldId = NULL;
packetMemoryAddressFieldId = NULL; packetMemoryAddressFieldId = NULL;
@ -666,6 +681,7 @@ static void netty_epoll_native_JNI_OnUnLoad(JNIEnv* env) {
netty_epoll_linuxsocket_JNI_OnUnLoad(env); netty_epoll_linuxsocket_JNI_OnUnLoad(env);
packetAddrFieldId = NULL; packetAddrFieldId = NULL;
packetAddrLenFieldId = NULL;
packetScopeIdFieldId = NULL; packetScopeIdFieldId = NULL;
packetPortFieldId = NULL; packetPortFieldId = NULL;
packetMemoryAddressFieldId = NULL; packetMemoryAddressFieldId = NULL;

View File

@ -40,6 +40,10 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
// We share one IovArray for all NativeDatagramPackets to reduce memory overhead. This will allow us to write // We share one IovArray for all NativeDatagramPackets to reduce memory overhead. This will allow us to write
// up to IOV_MAX iovec across all messages in one sendmmsg(...) call. // up to IOV_MAX iovec across all messages in one sendmmsg(...) call.
private final IovArray iovArray = new IovArray(); private final IovArray iovArray = new IovArray();
// temporary array to copy the ipv4 part of ipv6-mapped-ipv4 addresses and then create a Inet4Address out of it.
private final byte[] ipv4Bytes = new byte[4];
private int count; private int count;
NativeDatagramPacketArray() { NativeDatagramPacketArray() {
@ -110,13 +114,14 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
* Used to pass needed data to JNI. * Used to pass needed data to JNI.
*/ */
@SuppressWarnings("unused") @SuppressWarnings("unused")
static final class NativeDatagramPacket { final class NativeDatagramPacket {
// This is the actual struct iovec* // This is the actual struct iovec*
private long memoryAddress; private long memoryAddress;
private int count; private int count;
private final byte[] addr = new byte[16]; private final byte[] addr = new byte[16];
private int addrLen; private int addrLen;
private int scopeId; private int scopeId;
private int port; private int port;
@ -145,10 +150,11 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr
DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress localAddress) throws UnknownHostException { DatagramPacket newDatagramPacket(ByteBuf buffer, InetSocketAddress localAddress) throws UnknownHostException {
final InetAddress address; final InetAddress address;
if (scopeId != 0) { if (addrLen == ipv4Bytes.length) {
address = Inet6Address.getByAddress(null, addr, scopeId); System.arraycopy(addr, 0, ipv4Bytes, 0, addrLen);
address = InetAddress.getByAddress(ipv4Bytes);
} else { } else {
address = InetAddress.getByAddress(addr); address = Inet6Address.getByAddress(null, addr, scopeId);
} }
return new DatagramPacket(buffer.writerIndex(count), return new DatagramPacket(buffer.writerIndex(count),
localAddress, new InetSocketAddress(address, port)); localAddress, new InetSocketAddress(address, port));

View File

@ -28,6 +28,7 @@ import io.netty.testsuite.transport.socket.AbstractDatagramTest;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -82,6 +83,9 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest {
// Nothing will be sent. // Nothing will be sent.
} }
}); });
cc = cb.bind(newSocketAddress()).sync().channel();
final SocketAddress ccAddress = cc.localAddress();
final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>(); final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
final byte[] bytes = new byte[packetSize]; final byte[] bytes = new byte[packetSize];
ThreadLocalRandom.current().nextBytes(bytes); ThreadLocalRandom.current().nextBytes(bytes);
@ -98,6 +102,8 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest {
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) { protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
assertEquals(ccAddress, msg.sender());
assertEquals(bytes.length, msg.content().readableBytes()); assertEquals(bytes.length, msg.content().readableBytes());
byte[] receivedBytes = new byte[bytes.length]; byte[] receivedBytes = new byte[bytes.length];
msg.content().readBytes(receivedBytes); msg.content().readBytes(receivedBytes);
@ -115,7 +121,6 @@ public class EpollDatagramScatteringReadTest extends AbstractDatagramTest {
sb.option(ChannelOption.AUTO_READ, false); sb.option(ChannelOption.AUTO_READ, false);
sc = sb.bind(newSocketAddress()).sync().channel(); sc = sb.bind(newSocketAddress()).sync().channel();
cc = cb.bind(newSocketAddress()).sync().channel();
InetSocketAddress addr = (InetSocketAddress) sc.localAddress(); InetSocketAddress addr = (InetSocketAddress) sc.localAddress();

View File

@ -43,6 +43,7 @@ static jclass inetSocketAddressClass = NULL;
static int socketType = AF_INET; static int socketType = AF_INET;
static const unsigned char wildcardAddress[] = { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; static const unsigned char wildcardAddress[] = { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };
static const unsigned char ipv4MappedWildcardAddress[] = { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00 }; static const unsigned char ipv4MappedWildcardAddress[] = { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00 };
static const unsigned char ipv4MappedAddress[] = { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff };
// Optional external methods // Optional external methods
extern int accept4(int sockFd, struct sockaddr* addr, socklen_t* addrlen, int flags) __attribute__((weak)) __attribute__((weak_import)); extern int accept4(int sockFd, struct sockaddr* addr, socklen_t* addrlen, int flags) __attribute__((weak)) __attribute__((weak_import));
@ -67,14 +68,12 @@ static int nettyNonBlockingSocket(int domain, int type, int protocol) {
#endif #endif
} }
static int ipAddressLength(const struct sockaddr_storage* addr) { int netty_unix_socket_ipAddressLength(const struct sockaddr_storage* addr) {
if (addr->ss_family == AF_INET) { if (addr->ss_family == AF_INET) {
return 4; return 4;
} }
struct sockaddr_in6* s = (struct sockaddr_in6*) addr; struct sockaddr_in6* s = (struct sockaddr_in6*) addr;
if (s->sin6_addr.s6_addr[11] == 0xff && s->sin6_addr.s6_addr[10] == 0xff && if (memcmp(s->sin6_addr.s6_addr, ipv4MappedAddress, 12) == 0) {
s->sin6_addr.s6_addr[9] == 0x00 && s->sin6_addr.s6_addr[8] == 0x00 && s->sin6_addr.s6_addr[7] == 0x00 && s->sin6_addr.s6_addr[6] == 0x00 && s->sin6_addr.s6_addr[5] == 0x00 &&
s->sin6_addr.s6_addr[4] == 0x00 && s->sin6_addr.s6_addr[3] == 0x00 && s->sin6_addr.s6_addr[2] == 0x00 && s->sin6_addr.s6_addr[1] == 0x00 && s->sin6_addr.s6_addr[0] == 0x00) {
// IPv4-mapped-on-IPv6 // IPv4-mapped-on-IPv6
return 4; return 4;
} }
@ -84,7 +83,7 @@ static int ipAddressLength(const struct sockaddr_storage* addr) {
static jobject createDatagramSocketAddress(JNIEnv* env, const struct sockaddr_storage* addr, int len, jobject local) { static jobject createDatagramSocketAddress(JNIEnv* env, const struct sockaddr_storage* addr, int len, jobject local) {
int port; int port;
int scopeId; int scopeId;
int ipLength = ipAddressLength(addr); int ipLength = netty_unix_socket_ipAddressLength(addr);
jbyteArray addressBytes = (*env)->NewByteArray(env, ipLength); jbyteArray addressBytes = (*env)->NewByteArray(env, ipLength);
if (addr->ss_family == AF_INET) { if (addr->ss_family == AF_INET) {
@ -114,7 +113,7 @@ static jobject createDatagramSocketAddress(JNIEnv* env, const struct sockaddr_st
} }
static jsize addressLength(const struct sockaddr_storage* addr) { static jsize addressLength(const struct sockaddr_storage* addr) {
int len = ipAddressLength(addr); int len = netty_unix_socket_ipAddressLength(addr);
if (len == 4) { if (len == 4) {
// Only encode port into it // Only encode port into it
return len + 4; return len + 4;

View File

@ -23,6 +23,7 @@
int netty_unix_socket_initSockaddr(JNIEnv* env, jboolean ipv6, jbyteArray address, jint scopeId, jint jport, const struct sockaddr_storage* addr, socklen_t* addrSize); int netty_unix_socket_initSockaddr(JNIEnv* env, jboolean ipv6, jbyteArray address, jint scopeId, jint jport, const struct sockaddr_storage* addr, socklen_t* addrSize);
int netty_unix_socket_getOption(JNIEnv* env, jint fd, int level, int optname, void* optval, socklen_t optlen); int netty_unix_socket_getOption(JNIEnv* env, jint fd, int level, int optname, void* optval, socklen_t optlen);
int netty_unix_socket_setOption(JNIEnv* env, jint fd, int level, int optname, const void* optval, socklen_t len); int netty_unix_socket_setOption(JNIEnv* env, jint fd, int level, int optname, const void* optval, socklen_t len);
int netty_unix_socket_ipAddressLength(const struct sockaddr_storage* addr);
// These method is sometimes needed if you want to special handle some errno value before throwing an exception. // These method is sometimes needed if you want to special handle some errno value before throwing an exception.
int netty_unix_socket_getOption0(jint fd, int level, int optname, void* optval, socklen_t optlen); int netty_unix_socket_getOption0(jint fd, int level, int optname, void* optval, socklen_t optlen);