diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java index c4a1b14829..07dd9ded4c 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java @@ -39,7 +39,8 @@ public class DatagramUnicastTest extends AbstractDatagramTest { } public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable { - testSimpleSend0(sb, cb, Unpooled.directBuffer()); + testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 1); + testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 4); } @Test @@ -48,7 +49,8 @@ public class DatagramUnicastTest extends AbstractDatagramTest { } public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable { - testSimpleSend0(sb, cb, Unpooled.directBuffer()); + testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 1); + testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 4); } @Test @@ -60,7 +62,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest { CompositeByteBuf buf = Unpooled.compositeBuffer(); buf.addComponent(Unpooled.directBuffer(2, 2)); buf.addComponent(Unpooled.directBuffer(2, 2)); - testSimpleSend0(sb, cb, buf); + testSimpleSend0(sb, cb, buf, true, 1); + + CompositeByteBuf buf2 = Unpooled.compositeBuffer(); + buf2.addComponent(Unpooled.directBuffer(2, 2)); + buf2.addComponent(Unpooled.directBuffer(2, 2)); + testSimpleSend0(sb, cb, buf2, true, 4); } @Test @@ -72,7 +79,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest { CompositeByteBuf buf = Unpooled.compositeBuffer(); buf.addComponent(Unpooled.buffer(2, 2)); buf.addComponent(Unpooled.buffer(2, 2)); - testSimpleSend0(sb, cb, buf); + testSimpleSend0(sb, cb, buf, true, 1); + + CompositeByteBuf buf2 = Unpooled.compositeBuffer(); + buf2.addComponent(Unpooled.buffer(2, 2)); + buf2.addComponent(Unpooled.buffer(2, 2)); + testSimpleSend0(sb, cb, buf2, true, 4); } @Test @@ -84,36 +96,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest { CompositeByteBuf buf = Unpooled.compositeBuffer(); buf.addComponent(Unpooled.directBuffer(2, 2)); buf.addComponent(Unpooled.buffer(2, 2)); - testSimpleSend0(sb, cb, buf); - } + testSimpleSend0(sb, cb, buf, true, 1); - private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf) throws Throwable { - buf.writeInt(1); - final CountDownLatch latch = new CountDownLatch(1); - - sb.handler(new SimpleChannelInboundHandler() { - @Override - public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { - assertEquals(1, msg.content().readInt()); - latch.countDown(); - } - }); - - cb.handler(new SimpleChannelInboundHandler() { - @Override - public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception { - // Nothing will be sent. - } - }); - - Channel sc = sb.bind().sync().channel(); - Channel cc = cb.bind().sync().channel(); - - cc.writeAndFlush(new DatagramPacket(buf, addr)).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); - - sc.close().sync(); - cc.close().sync(); + CompositeByteBuf buf2 = Unpooled.compositeBuffer(); + buf2.addComponent(Unpooled.directBuffer(2, 2)); + buf2.addComponent(Unpooled.buffer(2, 2)); + testSimpleSend0(sb, cb, buf2, true, 4); } @Test @@ -121,9 +109,16 @@ public class DatagramUnicastTest extends AbstractDatagramTest { run(); } - @SuppressWarnings("deprecation") public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable { - final CountDownLatch latch = new CountDownLatch(1); + testSimpleSend0(sb, cb, Unpooled.directBuffer(), false, 1); + testSimpleSend0(sb, cb, Unpooled.directBuffer(), false, 4); + } + + @SuppressWarnings("deprecation") + private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient, int count) + throws Throwable { + buf.writeInt(1); + final CountDownLatch latch = new CountDownLatch(count); sb.handler(new SimpleChannelInboundHandler() { @Override @@ -139,12 +134,22 @@ public class DatagramUnicastTest extends AbstractDatagramTest { // Nothing will be sent. } }); - cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true); Channel sc = sb.bind().sync().channel(); - Channel cc = cb.register().sync().channel(); + Channel cc; + if (bindClient) { + cc = cb.bind().sync().channel(); + } else { + cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true); + cc = cb.register().sync().channel(); + } - cc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), addr)).sync(); + for (int i = 0; i < count; i++) { + cc.write(new DatagramPacket(buf.retain().duplicate(), addr)); + } + // release as we used buf.retain() before + buf.release(); + cc.flush(); assertTrue(latch.await(10, TimeUnit.SECONDS)); sc.close().sync(); diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index c3b416d8b7..5dc7eb5825 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -34,6 +34,15 @@ // optional extern int accept4(int sockFd, struct sockaddr *addr, socklen_t *addrlen, int flags) __attribute__((weak)); extern int epoll_create1(int flags) __attribute__((weak)); +extern int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, unsigned int flags) __attribute__((weak)); + +// Just define it here and NOT use #define _GNU_SOURCE as we also want to be able to build on systems that not support +// sendmmsg yet. The problem is if we use _GNU_SOURCE we will not be able to declare sendmmsg as extern +struct mmsghdr { + struct msghdr msg_hdr; /* Message header */ + unsigned int msg_len; /* Number of bytes transmitted */ +}; + // Those are initialized in the init(...) method and cached for performance reasons jmethodID updatePosId = NULL; @@ -44,7 +53,14 @@ jfieldID limitFieldId = NULL; jfieldID fileChannelFieldId = NULL; jfieldID transferedFieldId = NULL; jfieldID fdFieldId = NULL; -jfieldID fileDescriptorFieldId = NULL;; +jfieldID fileDescriptorFieldId = NULL; + +jfieldID packetAddrFieldId = NULL; +jfieldID packetScopeIdFieldId = NULL; +jfieldID packetPortFieldId = NULL; +jfieldID packetMemoryAddressFieldId = NULL; +jfieldID packetCountFieldId = NULL; + jmethodID inetSocketAddrMethodId = NULL; jmethodID datagramSocketAddrMethodId = NULL; jclass runtimeExceptionClass = NULL; @@ -53,6 +69,7 @@ jclass closedChannelExceptionClass = NULL; jmethodID closedChannelExceptionMethodId = NULL; jclass inetSocketAddressClass = NULL; jclass datagramSocketAddressClass = NULL; +jclass nativeDatagramPacketClass = NULL; static int socketType; static const char *ip4prefix = "::ffff:"; @@ -414,6 +431,38 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { throwRuntimeException(env, "Unable to obtain constructor of DatagramSocketAddress"); return JNI_ERR; } + jclass nativeDatagramPacketCls = (*env)->FindClass(env, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket"); + if (nativeDatagramPacketCls == NULL) { + // pending exception... + return JNI_ERR; + } + + packetAddrFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "addr", "[B"); + if (packetAddrFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain addr field for NativeDatagramPacket"); + return JNI_ERR; + } + packetScopeIdFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "scopeId", "I"); + if (packetScopeIdFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain scopeId field for NativeDatagramPacket"); + return JNI_ERR; + } + packetPortFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "port", "I"); + if (packetPortFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain port field for NativeDatagramPacket"); + return JNI_ERR; + } + packetMemoryAddressFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "memoryAddress", "J"); + if (packetMemoryAddressFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain memoryAddress field for NativeDatagramPacket"); + return JNI_ERR; + } + + packetCountFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "count", "I"); + if (packetCountFieldId == NULL) { + throwRuntimeException(env, "Unable to obtain count field for NativeDatagramPacket"); + return JNI_ERR; + } return JNI_VERSION_1_6; } } @@ -690,6 +739,53 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv return (jint) res; } +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendmmsg(JNIEnv * env, jclass clazz, jint fd, jobjectArray packets, jint offset, jint len) { + struct mmsghdr msg[len]; + int i; + + memset(msg, 0, sizeof(msg)); + + for (i = 0; i < len; i++) { + struct sockaddr_storage addr; + + jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset); + jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId); + jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId); + jint port = (*env)->GetIntField(env, packet, packetPortFieldId); + + if (init_sockaddr(env, address, scopeId, port, &addr) == -1) { + return -1; + } + + msg[i].msg_hdr.msg_name = &addr; + msg[i].msg_hdr.msg_namelen = sizeof(addr); + + msg[i].msg_hdr.msg_iov = (struct iovec *) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId); + msg[i].msg_hdr.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId);; + } + + ssize_t res; + int err; + do { + res = sendmmsg(fd, msg, len, 0); + // keep on writing if it was interrupted + } while(res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + // network stack saturated... try again later + if (err == EAGAIN || err == EWOULDBLOCK) { + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while sendmmsg(...): ", err)); + return -1; + } + return (jint) res; +} + jobject recvFrom0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit) { struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); @@ -1226,3 +1322,16 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEn JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz) { return IOV_MAX; } + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_uioMaxIov(JNIEnv *env, jclass clazz) { + return UIO_MAXIOV; +} + + +JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv *env, jclass clazz) { + if (sendmmsg) { + return JNI_TRUE; + } + return JNI_FALSE; +} + diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index feed7c0e6b..a6e9fb04aa 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -33,6 +33,11 @@ #define IOV_MAX 1024 #endif /* IOV_MAX */ +// Define UIO_MAXIOV if not found +#ifndef UIO_MAXIOV +#define UIO_MAXIOV 1024 +#endif /* UIO_MAXIOV */ + jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz); void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value); void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd); @@ -48,6 +53,7 @@ jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass cl jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port); jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port); jint Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port); +jint Java_io_netty_channel_epoll_Native_sendmmsg(JNIEnv * env, jclass clazz, jint fd, jobjectArray packets, jint offset, jint len); jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); @@ -95,3 +101,5 @@ jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv *env, jclass clazz, jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz); jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_uioMaxIov(JNIEnv *env, jclass clazz); +jboolean Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv *env, jclass clazz); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 6d589b252a..75d3415096 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -266,6 +266,32 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } try { + // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+ + if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) { + NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in); + int cnt = array.count(); + + if (cnt >= 1) { + // Try to use gathering writes via sendmmsg(...) syscall. + int offset = 0; + NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets(); + + while (cnt > 0) { + int send = Native.sendmmsg(fd, packets, offset, cnt); + if (send == 0) { + // Did not write all messages. + setEpollOut(); + return; + } + for (int i = 0; i < send; i++) { + in.remove(); + } + cnt -= send; + offset += send; + } + continue; + } + } boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { if (doWriteMessage(msg)) { @@ -322,7 +348,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(), remoteAddress.getAddress(), remoteAddress.getPort()); } else if (data instanceof CompositeByteBuf) { - IovArray array = IovArray.get((CompositeByteBuf) data); + IovArray array = IovArrayThreadLocal.get((CompositeByteBuf) data); int cnt = array.count(); assert cnt != 0; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index d38bef175e..975178ac8b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -358,7 +358,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception { if (PlatformDependent.hasUnsafe()) { // this means we can cast to IovArray and write the IovArray directly. - IovArray array = IovArray.get(in); + IovArray array = IovArrayThreadLocal.get(in); int cnt = array.count(); if (cnt >= 1) { // TODO: Handle the case where cnt == 1 specially. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java index ca0fed8901..e489837f86 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArray.java @@ -17,9 +17,7 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; -import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer.MessageProcessor; -import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.PlatformDependent; import java.nio.ByteBuffer; @@ -52,37 +50,30 @@ final class IovArray implements MessageProcessor { */ private static final int IOV_SIZE = 2 * ADDRESS_SIZE; - /** The needed memory to hold up to {@link Native#IOV_MAX} iov entries, where {@link Native#IOV_MAX} signified + /** + * The needed memory to hold up to {@link Native#IOV_MAX} iov entries, where {@link Native#IOV_MAX} signified * the maximum number of {@code iovec} structs that can be passed to {@code writev(...)}. */ private static final int CAPACITY = Native.IOV_MAX * IOV_SIZE; - private static final FastThreadLocal ARRAY = new FastThreadLocal() { - @Override - protected IovArray initialValue() throws Exception { - return new IovArray(); - } - - @Override - protected void onRemoval(IovArray value) throws Exception { - // free the direct memory now - PlatformDependent.freeMemory(value.memoryAddress); - } - }; - private final long memoryAddress; private int count; private long size; - private IovArray() { + IovArray() { memoryAddress = PlatformDependent.allocateMemory(CAPACITY); } + void clear() { + count = 0; + size = 0; + } + /** * Try to add the given {@link ByteBuf}. Returns {@code true} on success, * {@code false} otherwise. */ - private boolean add(ByteBuf buf) { + boolean add(ByteBuf buf) { if (count == Native.IOV_MAX) { // No more room! return false; @@ -124,7 +115,11 @@ final class IovArray implements MessageProcessor { size += len; } - private boolean add(CompositeByteBuf buf) { + /** + * Try to add the given {@link CompositeByteBuf}. Returns {@code true} on success, + * {@code false} otherwise. + */ + boolean add(CompositeByteBuf buf) { ByteBuffer[] buffers = buf.nioBuffers(); if (count + buffers.length >= Native.IOV_MAX) { // No more room! @@ -196,6 +191,13 @@ final class IovArray implements MessageProcessor { return memoryAddress + IOV_SIZE * offset; } + /** + * Release the {@link IovArray}. Once release further using of it may crash the JVM! + */ + void release() { + PlatformDependent.freeMemory(memoryAddress); + } + @Override public boolean processMessage(Object msg) throws Exception { if (msg instanceof ByteBuf) { @@ -207,23 +209,4 @@ final class IovArray implements MessageProcessor { } return false; } - - /** - * Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}. - */ - static IovArray get(ChannelOutboundBuffer buffer) throws Exception { - IovArray array = ARRAY.get(); - array.size = 0; - array.count = 0; - buffer.forEachFlushedMessage(array); - return array; - } - - static IovArray get(CompositeByteBuf buf) throws Exception { - IovArray array = ARRAY.get(); - array.size = 0; - array.count = 0; - array.processMessage(buf); - return array; - } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArrayThreadLocal.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArrayThreadLocal.java new file mode 100644 index 0000000000..b7f66dcb42 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/IovArrayThreadLocal.java @@ -0,0 +1,61 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.util.concurrent.FastThreadLocal; + +/** + * Allow to obtain {@link IovArray} instances. + */ +final class IovArrayThreadLocal { + + private static final FastThreadLocal ARRAY = new FastThreadLocal() { + @Override + protected IovArray initialValue() throws Exception { + return new IovArray(); + } + + @Override + protected void onRemoval(IovArray value) throws Exception { + // free the direct memory now + value.release(); + } + }; + + /** + * Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}. + */ + static IovArray get(ChannelOutboundBuffer buffer) throws Exception { + IovArray array = ARRAY.get(); + array.clear(); + buffer.forEachFlushedMessage(array); + return array; + } + + /** + * Returns a {@link IovArray} which is filled with the {@link CompositeByteBuf}. + */ + static IovArray get(CompositeByteBuf buf) throws Exception { + IovArray array = ARRAY.get(); + array.clear(); + array.add(buf); + return array; + } + + private IovArrayThreadLocal() { } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 814dc83a39..5bbd890692 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -52,6 +52,8 @@ final class Native { public static final int EPOLLACCEPT = 0x04; public static final int EPOLLRDHUP = 0x08; public static final int IOV_MAX = iovMax(); + public static final int UIO_MAX_IOV = uioMaxIov(); + public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg(); public static native int eventFd(); public static native void eventFdWrite(int fd, long value); @@ -144,6 +146,11 @@ final class Native { public static native EpollDatagramChannel.DatagramSocketAddress recvFromAddress( int fd, long memoryAddress, int pos, int limit) throws IOException; + public static native int sendmmsg( + int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len) throws IOException; + + private static native boolean isSupportingSendmmsg(); + // socket operations public static int socketStreamFd() { try { @@ -168,7 +175,7 @@ final class Native { bind(fd, address.address, address.scopeId, port); } - private static byte[] ipv4MappedIpv6Address(byte[] ipv4) { + static byte[] ipv4MappedIpv6Address(byte[] ipv4) { byte[] address = new byte[16]; System.arraycopy(IPV4_MAPPED_IPV6_PREFIX, 0, address, 0, IPV4_MAPPED_IPV6_PREFIX.length); System.arraycopy(ipv4, 0, address, 12, ipv4.length); @@ -246,6 +253,8 @@ final class Native { private static native int iovMax(); + private static native int uioMaxIov(); + private Native() { // utility } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java new file mode 100644 index 0000000000..2861e51032 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java @@ -0,0 +1,158 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.concurrent.FastThreadLocal; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +/** + * Support sendmmsg(...) on linux with GLIBC 2.14+ + */ +final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor { + + private static final FastThreadLocal ARRAY = + new FastThreadLocal() { + @Override + protected NativeDatagramPacketArray initialValue() throws Exception { + return new NativeDatagramPacketArray(); + } + + @Override + protected void onRemoval(NativeDatagramPacketArray value) throws Exception { + NativeDatagramPacket[] array = value.packets; + // Release all packets + for (int i = 0; i < array.length; i++) { + array[i].release(); + } + } + }; + + // Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call. + private final NativeDatagramPacket[] packets = new NativeDatagramPacket[Native.UIO_MAX_IOV]; + private int count; + + private NativeDatagramPacketArray() { + for (int i = 0; i < packets.length; i++) { + packets[i] = new NativeDatagramPacket(); + } + } + + /** + * Try to add the given {@link DatagramPacket}. Returns {@code true} on success, + * {@code false} otherwise. + */ + boolean add(DatagramPacket packet) { + if (count == packets.length) { + return false; + } + ByteBuf content = packet.content(); + int len = content.readableBytes(); + if (len == 0) { + return true; + } + NativeDatagramPacket p = packets[count]; + InetSocketAddress recipient = packet.recipient(); + if (!p.init(content, recipient)) { + return false; + } + + count++; + return true; + } + + @Override + public boolean processMessage(Object msg) throws Exception { + return msg instanceof DatagramPacket && add((DatagramPacket) msg); + } + + /** + * Returns the count + */ + int count() { + return count; + } + + /** + * Returns an array with {@link #count()} {@link NativeDatagramPacket}s filled. + */ + NativeDatagramPacket[] packets() { + return packets; + } + + /** + * Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of + * {@link ChannelOutboundBuffer}. + */ + static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception { + NativeDatagramPacketArray array = ARRAY.get(); + array.count = 0; + buffer.forEachFlushedMessage(array); + return array; + } + + /** + * Used to pass needed data to JNI. + */ + @SuppressWarnings("unused") + static final class NativeDatagramPacket { + // Each NativeDatagramPackets holds a IovArray which is used for gathering writes. + // This is ok as NativeDatagramPacketArray is always obtained via a FastThreadLocal and + // so the memory needed is quite small anyway. + private final IovArray array = new IovArray(); + + // This is the actual struct iovec* + private long memoryAddress; + private int count; + + private byte[] addr; + private int scopeId; + private int port; + + private void release() { + array.release(); + } + + /** + * Init this instance and return {@code true} if the init was successful. + */ + private boolean init(ByteBuf buf, InetSocketAddress recipient) { + array.clear(); + if (!array.add(buf)) { + return false; + } + // always start from offset 0 + memoryAddress = array.memoryAddress(0); + count = array.count(); + + InetAddress address = recipient.getAddress(); + if (address instanceof Inet6Address) { + addr = address.getAddress(); + scopeId = ((Inet6Address) address).getScopeId(); + } else { + addr = Native.ipv4MappedIpv6Address(address.getAddress()); + scopeId = 0; + } + port = recipient.getPort(); + return true; + } + } +}