diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java index 0ab73a9613..c4a1b14829 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java @@ -16,6 +16,8 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -32,11 +34,61 @@ import static org.junit.Assert.*; public class DatagramUnicastTest extends AbstractDatagramTest { @Test - public void testSimpleSend() throws Throwable { + public void testSimpleSendDirectByteBuf() throws Throwable { run(); } - public void testSimpleSend(Bootstrap sb, Bootstrap cb) throws Throwable { + public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable { + testSimpleSend0(sb, cb, Unpooled.directBuffer()); + } + + @Test + public void testSimpleSendHeapByteBuf() throws Throwable { + run(); + } + + public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable { + testSimpleSend0(sb, cb, Unpooled.directBuffer()); + } + + @Test + public void testSimpleSendCompositeDirectByteBuf() throws Throwable { + run(); + } + + public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable { + CompositeByteBuf buf = Unpooled.compositeBuffer(); + buf.addComponent(Unpooled.directBuffer(2, 2)); + buf.addComponent(Unpooled.directBuffer(2, 2)); + testSimpleSend0(sb, cb, buf); + } + + @Test + public void testSimpleSendCompositeHeapByteBuf() throws Throwable { + run(); + } + + public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable { + CompositeByteBuf buf = Unpooled.compositeBuffer(); + buf.addComponent(Unpooled.buffer(2, 2)); + buf.addComponent(Unpooled.buffer(2, 2)); + testSimpleSend0(sb, cb, buf); + } + + @Test + public void testSimpleSendCompositeMixedByteBuf() throws Throwable { + run(); + } + + public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable { + CompositeByteBuf buf = Unpooled.compositeBuffer(); + buf.addComponent(Unpooled.directBuffer(2, 2)); + buf.addComponent(Unpooled.buffer(2, 2)); + testSimpleSend0(sb, cb, buf); + } + + private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf) throws Throwable { + buf.writeInt(1); final CountDownLatch latch = new CountDownLatch(1); sb.handler(new SimpleChannelInboundHandler() { @@ -57,7 +109,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest { Channel sc = sb.bind().sync().channel(); Channel cc = cb.bind().sync().channel(); - cc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), addr)).sync(); + cc.writeAndFlush(new DatagramPacket(buf, addr)).sync(); assertTrue(latch.await(10, TimeUnit.SECONDS)); sc.close().sync(); diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index 52be419d94..c3b416d8b7 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -655,6 +655,41 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * return sendTo0(env, fd, (void*) memoryAddress, pos, limit, address, scopeId, port); } +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) { + struct sockaddr_storage addr; + + if (init_sockaddr(env, address, scopeId, port, &addr) == -1) { + return -1; + } + + struct msghdr m; + m.msg_name = (void*) &addr; + m.msg_namelen = (socklen_t) sizeof(struct sockaddr_storage); + m.msg_iov = (struct iovec *) memoryAddress; + m.msg_iovlen = length; + + ssize_t res; + int err; + do { + res = sendmsg(fd, &m, 0); + // keep on writing if it was interrupted + } while(res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + // network stack saturated... try again later + if (err == EAGAIN || err == EWOULDBLOCK) { + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while sendto(...): ", err)); + return -1; + } + return (jint) res; +} + jobject recvFrom0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit) { struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index ac0677f97b..feed7c0e6b 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -47,6 +47,7 @@ jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length); jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port); jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port); +jint Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port); jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 85cc6c16f0..9e7be0fdc9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -16,6 +16,7 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.channel.AddressedEnvelope; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; @@ -28,6 +29,7 @@ import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramPacket; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import java.io.IOException; @@ -288,7 +290,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } } - private boolean doWriteMessage(Object msg) throws IOException { + private boolean doWriteMessage(Object msg) throws Exception { final ByteBuf data; InetSocketAddress remoteAddress; if (msg instanceof AddressedEnvelope) { @@ -319,6 +321,13 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements long memoryAddress = data.memoryAddress(); writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(), remoteAddress.getAddress(), remoteAddress.getPort()); + } else if (data instanceof CompositeByteBuf) { + IovArray array = IovArray.get((CompositeByteBuf) data); + int cnt = array.count(); + assert cnt != 0; + + writtenBytes = Native.sendToAddresses(fd, array.memoryAddress(0), + cnt, remoteAddress.getAddress(), remoteAddress.getPort()); } else { ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes()); writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(), @@ -344,13 +353,24 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; - if (buf.hasMemoryAddress()) { - return msg; + if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) { + if (buf instanceof CompositeByteBuf) { + // Special handling of CompositeByteBuf to reduce memory copies if some of the Components + // in the CompositeByteBuf are backed by a memoryAddress. + CompositeByteBuf comp = (CompositeByteBuf) buf; + if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) { + // more then 1024 buffers for gathering writes so just do a memory copy. + buf = newDirectBuffer(buf); + assert buf.hasMemoryAddress(); + } + } else { + // We can only handle buffers with memory address so we need to copy if a non direct is + // passed to write. + buf = newDirectBuffer(buf); + assert buf.hasMemoryAddress(); + } } - - // We can only handle direct buffers so we need to copy if a non direct is - // passed to write. - return newDirectBuffer(buf); + return buf; } if (msg instanceof AddressedEnvelope) { @@ -363,7 +383,14 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (content.hasMemoryAddress()) { return e; } - + if (content instanceof CompositeByteBuf) { + // Special handling of CompositeByteBuf to reduce memory copies if some of the Components + // in the CompositeByteBuf are backed by a memoryAddress. + CompositeByteBuf comp = (CompositeByteBuf) content; + if (comp.isDirect() && comp.nioBufferCount() <= Native.IOV_MAX) { + return e; + } + } // We can only handle direct buffers so we need to copy if a non direct is // passed to write. return new DefaultAddressedEnvelope( diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java index e685d9760b..ca0fed8901 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java @@ -218,4 +218,12 @@ final class IovArray implements MessageProcessor { buffer.forEachFlushedMessage(array); return array; } + + static IovArray get(CompositeByteBuf buf) throws Exception { + IovArray array = ARRAY.get(); + array.size = 0; + array.count = 0; + array.processMessage(buf); + return array; + } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index bf13390339..fb42ef0736 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -118,6 +118,26 @@ final class Native { private static native int sendToAddress( int fd, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port) throws IOException; + public static int sendToAddresses( + int fd, long memoryAddress, int length, InetAddress addr, int port) throws IOException { + // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected + // to be called frequently + byte[] address; + int scopeId; + if (addr instanceof Inet6Address) { + address = addr.getAddress(); + scopeId = ((Inet6Address) addr).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + scopeId = 0; + address = ipv4MappedIpv6Address(addr.getAddress()); + } + return sendToAddresses(fd, memoryAddress, length, address, scopeId, port); + } + + private static native int sendToAddresses( + int fd, long memoryAddress, int length, byte[] address, int scopeId, int port) throws IOException; + public static native EpollDatagramChannel.DatagramSocketAddress recvFrom( int fd, ByteBuffer buf, int pos, int limit) throws IOException;