diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java index 6b009e7d27..817b75e128 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java @@ -32,6 +32,8 @@ import org.junit.Test; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.NotYetConnectedException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -242,25 +244,16 @@ public class DatagramUnicastTest extends AbstractDatagramTest { cc.connect(sc.localAddress()).syncUninterruptibly(); + List futures = new ArrayList(); for (int i = 0; i < count; i++) { - switch (wrapType) { - case DUP: - cc.write(buf.retainedDuplicate()); - break; - case SLICE: - cc.write(buf.retainedSlice()); - break; - case READ_ONLY: - cc.write(buf.retain().asReadOnly()); - break; - case NONE: - cc.write(buf.retain()); - break; - default: - throw new Error("unknown wrap type: " + wrapType); - } + futures.add(write(cc, buf, wrapType)); } cc.flush(); + + for (ChannelFuture future: futures) { + future.sync(); + } + assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(clientLatch.await(10, TimeUnit.SECONDS)); assertTrue(cc.isConnected()); @@ -282,6 +275,21 @@ public class DatagramUnicastTest extends AbstractDatagramTest { } } + private static ChannelFuture write(Channel cc, ByteBuf buf, WrapType wrapType) { + switch (wrapType) { + case DUP: + return cc.write(buf.retainedDuplicate()); + case SLICE: + return cc.write(buf.retainedSlice()); + case READ_ONLY: + return cc.write(buf.retain().asReadOnly()); + case NONE: + return cc.write(buf.retain()); + default: + throw new Error("unknown wrap type: " + wrapType); + } + } + @SuppressWarnings("deprecation") private Channel setupServerChannel(Bootstrap sb, final byte[] bytes, final SocketAddress sender, final CountDownLatch latch, final boolean echo) diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index 9b4e54f43c..a2f1a31505 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -319,16 +319,19 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset); jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId); - jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId); - jint port = (*env)->GetIntField(env, packet, packetPortFieldId); + jint addrLen = (*env)->GetIntField(env, packet, packetAddrLenFieldId); - if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr[i], &addrSize) == -1) { - return -1; + if (addrLen != 0) { + jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId); + jint port = (*env)->GetIntField(env, packet, packetPortFieldId); + + if (netty_unix_socket_initSockaddr(env, ipv6, address, scopeId, port, &addr[i], &addrSize) == -1) { + return -1; + } + msg[i].msg_hdr.msg_name = &addr[i]; + msg[i].msg_hdr.msg_namelen = addrSize; } - msg[i].msg_hdr.msg_name = &addr[i]; - msg[i].msg_hdr.msg_namelen = addrSize; - msg[i].msg_hdr.msg_iov = (struct iovec*) (intptr_t) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId); msg[i].msg_hdr.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index b6a5e0ff47..6c842e3a19 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -311,7 +311,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+ if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) { NativeDatagramPacketArray array = cleanDatagramPacketArray(); - in.forEachFlushedMessage(array); + array.add(in, isConnected()); int cnt = array.count(); if (cnt >= 1) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java index 0e97cd52ac..4b0e6750a0 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java @@ -17,6 +17,7 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelOutboundBuffer.MessageProcessor; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.unix.IovArray; import io.netty.channel.unix.Limits; @@ -32,7 +33,7 @@ import static io.netty.channel.unix.NativeInetAddress.copyIpv4MappedIpv6Address; /** * Support sendmmsg(...) on linux with GLIBC 2.14+ */ -final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor { +final class NativeDatagramPacketArray { // Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call. private final NativeDatagramPacket[] packets = new NativeDatagramPacket[UIO_MAX_IOV]; @@ -43,6 +44,7 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr // temporary array to copy the ipv4 part of ipv6-mapped-ipv4 addresses and then create a Inet4Address out of it. private final byte[] ipv4Bytes = new byte[4]; + private final MyMessageProcessor processor = new MyMessageProcessor(); private int count; @@ -52,11 +54,6 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr } } - private boolean addReadable(DatagramPacket packet) { - ByteBuf buf = packet.content(); - return add0(buf, buf.readerIndex(), buf.readableBytes(), packet.recipient()); - } - boolean addWritable(ByteBuf buf, int index, int len) { return add0(buf, index, len, null); } @@ -82,9 +79,9 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr return true; } - @Override - public boolean processMessage(Object msg) { - return msg instanceof DatagramPacket && addReadable((DatagramPacket) msg); + void add(ChannelOutboundBuffer buffer, boolean connected) throws Exception { + processor.connected = connected; + buffer.forEachFlushedMessage(processor); } /** @@ -110,6 +107,24 @@ final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessagePr iovArray.release(); } + private final class MyMessageProcessor implements MessageProcessor { + private boolean connected; + + @Override + public boolean processMessage(Object msg) { + if (msg instanceof DatagramPacket) { + DatagramPacket packet = (DatagramPacket) msg; + ByteBuf buf = packet.content(); + return add0(buf, buf.readerIndex(), buf.readableBytes(), packet.recipient()); + } + if (msg instanceof ByteBuf && connected) { + ByteBuf buf = (ByteBuf) msg; + return add0(buf, buf.readerIndex(), buf.readableBytes(), null); + } + return false; + } + } + /** * Used to pass needed data to JNI. */