Add support for sendmmsg(...) and so allow to write multiple DatagramPackets with one syscall. Related to [#2719]

Motivation:

On linux with glibc >= 2.14 it is possible to send multiple DatagramPackets with one syscall. This can be a huge performance win and so we should support it in our native transport.

Modification:

- Add support for sendmmsg by reuse IovArray
- Factor out ThreadLocal support of IovArray to IovArrayThreadLocal for better separation as we use IovArray also without ThreadLocal in NativeDatagramPacketArray now
- Introduce NativeDatagramPacketArray which is used for sendmmsg(...)
- Implement sendmmsg(...) via jni
- Expand DatagramUnicastTest to test also sendmmsg(...)

Result:

Netty now automatically use sendmmsg(...) if it is supported and we have more then 1 DatagramPacket in the ChannelOutboundBuffer and flush() is called.
This commit is contained in:
Norman Maurer 2014-09-03 13:54:44 +01:00
parent 07876ebf68
commit 7867f986fa
9 changed files with 439 additions and 80 deletions

View File

@ -39,7 +39,8 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
}
public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
testSimpleSend0(sb, cb, Unpooled.directBuffer());
testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 1);
testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 4);
}
@Test
@ -48,7 +49,8 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
}
public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
testSimpleSend0(sb, cb, Unpooled.directBuffer());
testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 1);
testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 4);
}
@Test
@ -60,7 +62,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.directBuffer(2, 2));
buf.addComponent(Unpooled.directBuffer(2, 2));
testSimpleSend0(sb, cb, buf);
testSimpleSend0(sb, cb, buf, true, 1);
CompositeByteBuf buf2 = Unpooled.compositeBuffer();
buf2.addComponent(Unpooled.directBuffer(2, 2));
buf2.addComponent(Unpooled.directBuffer(2, 2));
testSimpleSend0(sb, cb, buf2, true, 4);
}
@Test
@ -72,7 +79,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.buffer(2, 2));
buf.addComponent(Unpooled.buffer(2, 2));
testSimpleSend0(sb, cb, buf);
testSimpleSend0(sb, cb, buf, true, 1);
CompositeByteBuf buf2 = Unpooled.compositeBuffer();
buf2.addComponent(Unpooled.buffer(2, 2));
buf2.addComponent(Unpooled.buffer(2, 2));
testSimpleSend0(sb, cb, buf2, true, 4);
}
@Test
@ -84,36 +96,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.directBuffer(2, 2));
buf.addComponent(Unpooled.buffer(2, 2));
testSimpleSend0(sb, cb, buf);
}
testSimpleSend0(sb, cb, buf, true, 1);
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf) throws Throwable {
buf.writeInt(1);
final CountDownLatch latch = new CountDownLatch(1);
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
assertEquals(1, msg.content().readInt());
latch.countDown();
}
});
cb.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception {
// Nothing will be sent.
}
});
Channel sc = sb.bind().sync().channel();
Channel cc = cb.bind().sync().channel();
cc.writeAndFlush(new DatagramPacket(buf, addr)).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
sc.close().sync();
cc.close().sync();
CompositeByteBuf buf2 = Unpooled.compositeBuffer();
buf2.addComponent(Unpooled.directBuffer(2, 2));
buf2.addComponent(Unpooled.buffer(2, 2));
testSimpleSend0(sb, cb, buf2, true, 4);
}
@Test
@ -121,9 +109,16 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
run();
}
@SuppressWarnings("deprecation")
public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
testSimpleSend0(sb, cb, Unpooled.directBuffer(), false, 1);
testSimpleSend0(sb, cb, Unpooled.directBuffer(), false, 4);
}
@SuppressWarnings("deprecation")
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient, int count)
throws Throwable {
buf.writeInt(1);
final CountDownLatch latch = new CountDownLatch(count);
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
@ -139,12 +134,22 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
// Nothing will be sent.
}
});
cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
Channel sc = sb.bind().sync().channel();
Channel cc = cb.register().sync().channel();
Channel cc;
if (bindClient) {
cc = cb.bind().sync().channel();
} else {
cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
cc = cb.register().sync().channel();
}
cc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), addr)).sync();
for (int i = 0; i < count; i++) {
cc.write(new DatagramPacket(buf.retain().duplicate(), addr));
}
// release as we used buf.retain() before
buf.release();
cc.flush();
assertTrue(latch.await(10, TimeUnit.SECONDS));
sc.close().sync();

View File

@ -34,6 +34,15 @@
// optional
extern int accept4(int sockFd, struct sockaddr *addr, socklen_t *addrlen, int flags) __attribute__((weak));
extern int epoll_create1(int flags) __attribute__((weak));
extern int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, unsigned int flags) __attribute__((weak));
// Just define it here and NOT use #define _GNU_SOURCE as we also want to be able to build on systems that not support
// sendmmsg yet. The problem is if we use _GNU_SOURCE we will not be able to declare sendmmsg as extern
struct mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of bytes transmitted */
};
// Those are initialized in the init(...) method and cached for performance reasons
jmethodID updatePosId = NULL;
@ -44,7 +53,14 @@ jfieldID limitFieldId = NULL;
jfieldID fileChannelFieldId = NULL;
jfieldID transferedFieldId = NULL;
jfieldID fdFieldId = NULL;
jfieldID fileDescriptorFieldId = NULL;;
jfieldID fileDescriptorFieldId = NULL;
jfieldID packetAddrFieldId = NULL;
jfieldID packetScopeIdFieldId = NULL;
jfieldID packetPortFieldId = NULL;
jfieldID packetMemoryAddressFieldId = NULL;
jfieldID packetCountFieldId = NULL;
jmethodID inetSocketAddrMethodId = NULL;
jmethodID datagramSocketAddrMethodId = NULL;
jclass runtimeExceptionClass = NULL;
@ -53,6 +69,7 @@ jclass closedChannelExceptionClass = NULL;
jmethodID closedChannelExceptionMethodId = NULL;
jclass inetSocketAddressClass = NULL;
jclass datagramSocketAddressClass = NULL;
jclass nativeDatagramPacketClass = NULL;
static int socketType;
static const char *ip4prefix = "::ffff:";
@ -414,6 +431,38 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
throwRuntimeException(env, "Unable to obtain constructor of DatagramSocketAddress");
return JNI_ERR;
}
jclass nativeDatagramPacketCls = (*env)->FindClass(env, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket");
if (nativeDatagramPacketCls == NULL) {
// pending exception...
return JNI_ERR;
}
packetAddrFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "addr", "[B");
if (packetAddrFieldId == NULL) {
throwRuntimeException(env, "Unable to obtain addr field for NativeDatagramPacket");
return JNI_ERR;
}
packetScopeIdFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "scopeId", "I");
if (packetScopeIdFieldId == NULL) {
throwRuntimeException(env, "Unable to obtain scopeId field for NativeDatagramPacket");
return JNI_ERR;
}
packetPortFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "port", "I");
if (packetPortFieldId == NULL) {
throwRuntimeException(env, "Unable to obtain port field for NativeDatagramPacket");
return JNI_ERR;
}
packetMemoryAddressFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "memoryAddress", "J");
if (packetMemoryAddressFieldId == NULL) {
throwRuntimeException(env, "Unable to obtain memoryAddress field for NativeDatagramPacket");
return JNI_ERR;
}
packetCountFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "count", "I");
if (packetCountFieldId == NULL) {
throwRuntimeException(env, "Unable to obtain count field for NativeDatagramPacket");
return JNI_ERR;
}
return JNI_VERSION_1_6;
}
}
@ -690,6 +739,53 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv
return (jint) res;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendmmsg(JNIEnv * env, jclass clazz, jint fd, jobjectArray packets, jint offset, jint len) {
struct mmsghdr msg[len];
int i;
memset(msg, 0, sizeof(msg));
for (i = 0; i < len; i++) {
struct sockaddr_storage addr;
jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset);
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);
jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId);
jint port = (*env)->GetIntField(env, packet, packetPortFieldId);
if (init_sockaddr(env, address, scopeId, port, &addr) == -1) {
return -1;
}
msg[i].msg_hdr.msg_name = &addr;
msg[i].msg_hdr.msg_namelen = sizeof(addr);
msg[i].msg_hdr.msg_iov = (struct iovec *) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId);
msg[i].msg_hdr.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId);;
}
ssize_t res;
int err;
do {
res = sendmmsg(fd, msg, len, 0);
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));
if (res < 0) {
// network stack saturated... try again later
if (err == EAGAIN || err == EWOULDBLOCK) {
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while sendmmsg(...): ", err));
return -1;
}
return (jint) res;
}
jobject recvFrom0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
@ -1226,3 +1322,16 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEn
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz) {
return IOV_MAX;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_uioMaxIov(JNIEnv *env, jclass clazz) {
return UIO_MAXIOV;
}
JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv *env, jclass clazz) {
if (sendmmsg) {
return JNI_TRUE;
}
return JNI_FALSE;
}

View File

@ -33,6 +33,11 @@
#define IOV_MAX 1024
#endif /* IOV_MAX */
// Define UIO_MAXIOV if not found
#ifndef UIO_MAXIOV
#define UIO_MAXIOV 1024
#endif /* UIO_MAXIOV */
jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz);
void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value);
void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd);
@ -48,6 +53,7 @@ jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass cl
jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
jint Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port);
jint Java_io_netty_channel_epoll_Native_sendmmsg(JNIEnv * env, jclass clazz, jint fd, jobjectArray packets, jint offset, jint len);
jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
@ -95,3 +101,5 @@ jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv *env, jclass clazz,
jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_uioMaxIov(JNIEnv *env, jclass clazz);
jboolean Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv *env, jclass clazz);

View File

@ -266,6 +266,32 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
try {
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
int cnt = array.count();
if (cnt >= 1) {
// Try to use gathering writes via sendmmsg(...) syscall.
int offset = 0;
NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
while (cnt > 0) {
int send = Native.sendmmsg(fd, packets, offset, cnt);
if (send == 0) {
// Did not write all messages.
setEpollOut();
return;
}
for (int i = 0; i < send; i++) {
in.remove();
}
cnt -= send;
offset += send;
}
continue;
}
}
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
if (doWriteMessage(msg)) {
@ -322,7 +348,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
} else if (data instanceof CompositeByteBuf) {
IovArray array = IovArray.get((CompositeByteBuf) data);
IovArray array = IovArrayThreadLocal.get((CompositeByteBuf) data);
int cnt = array.count();
assert cnt != 0;

View File

@ -358,7 +358,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = IovArray.get(in);
IovArray array = IovArrayThreadLocal.get(in);
int cnt = array.count();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.

View File

@ -17,9 +17,7 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
@ -52,37 +50,30 @@ final class IovArray implements MessageProcessor {
*/
private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
/** The needed memory to hold up to {@link Native#IOV_MAX} iov entries, where {@link Native#IOV_MAX} signified
/**
* The needed memory to hold up to {@link Native#IOV_MAX} iov entries, where {@link Native#IOV_MAX} signified
* the maximum number of {@code iovec} structs that can be passed to {@code writev(...)}.
*/
private static final int CAPACITY = Native.IOV_MAX * IOV_SIZE;
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
@Override
protected IovArray initialValue() throws Exception {
return new IovArray();
}
@Override
protected void onRemoval(IovArray value) throws Exception {
// free the direct memory now
PlatformDependent.freeMemory(value.memoryAddress);
}
};
private final long memoryAddress;
private int count;
private long size;
private IovArray() {
IovArray() {
memoryAddress = PlatformDependent.allocateMemory(CAPACITY);
}
void clear() {
count = 0;
size = 0;
}
/**
* Try to add the given {@link ByteBuf}. Returns {@code true} on success,
* {@code false} otherwise.
*/
private boolean add(ByteBuf buf) {
boolean add(ByteBuf buf) {
if (count == Native.IOV_MAX) {
// No more room!
return false;
@ -124,7 +115,11 @@ final class IovArray implements MessageProcessor {
size += len;
}
private boolean add(CompositeByteBuf buf) {
/**
* Try to add the given {@link CompositeByteBuf}. Returns {@code true} on success,
* {@code false} otherwise.
*/
boolean add(CompositeByteBuf buf) {
ByteBuffer[] buffers = buf.nioBuffers();
if (count + buffers.length >= Native.IOV_MAX) {
// No more room!
@ -196,6 +191,13 @@ final class IovArray implements MessageProcessor {
return memoryAddress + IOV_SIZE * offset;
}
/**
* Release the {@link IovArray}. Once release further using of it may crash the JVM!
*/
void release() {
PlatformDependent.freeMemory(memoryAddress);
}
@Override
public boolean processMessage(Object msg) throws Exception {
if (msg instanceof ByteBuf) {
@ -207,23 +209,4 @@ final class IovArray implements MessageProcessor {
}
return false;
}
/**
* Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}.
*/
static IovArray get(ChannelOutboundBuffer buffer) throws Exception {
IovArray array = ARRAY.get();
array.size = 0;
array.count = 0;
buffer.forEachFlushedMessage(array);
return array;
}
static IovArray get(CompositeByteBuf buf) throws Exception {
IovArray array = ARRAY.get();
array.size = 0;
array.count = 0;
array.processMessage(buf);
return array;
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.concurrent.FastThreadLocal;
/**
* Allow to obtain {@link IovArray} instances.
*/
final class IovArrayThreadLocal {
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
@Override
protected IovArray initialValue() throws Exception {
return new IovArray();
}
@Override
protected void onRemoval(IovArray value) throws Exception {
// free the direct memory now
value.release();
}
};
/**
* Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}.
*/
static IovArray get(ChannelOutboundBuffer buffer) throws Exception {
IovArray array = ARRAY.get();
array.clear();
buffer.forEachFlushedMessage(array);
return array;
}
/**
* Returns a {@link IovArray} which is filled with the {@link CompositeByteBuf}.
*/
static IovArray get(CompositeByteBuf buf) throws Exception {
IovArray array = ARRAY.get();
array.clear();
array.add(buf);
return array;
}
private IovArrayThreadLocal() { }
}

View File

@ -52,6 +52,8 @@ final class Native {
public static final int EPOLLACCEPT = 0x04;
public static final int EPOLLRDHUP = 0x08;
public static final int IOV_MAX = iovMax();
public static final int UIO_MAX_IOV = uioMaxIov();
public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();
public static native int eventFd();
public static native void eventFdWrite(int fd, long value);
@ -144,6 +146,11 @@ final class Native {
public static native EpollDatagramChannel.DatagramSocketAddress recvFromAddress(
int fd, long memoryAddress, int pos, int limit) throws IOException;
public static native int sendmmsg(
int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len) throws IOException;
private static native boolean isSupportingSendmmsg();
// socket operations
public static int socketStreamFd() {
try {
@ -168,7 +175,7 @@ final class Native {
bind(fd, address.address, address.scopeId, port);
}
private static byte[] ipv4MappedIpv6Address(byte[] ipv4) {
static byte[] ipv4MappedIpv6Address(byte[] ipv4) {
byte[] address = new byte[16];
System.arraycopy(IPV4_MAPPED_IPV6_PREFIX, 0, address, 0, IPV4_MAPPED_IPV6_PREFIX.length);
System.arraycopy(ipv4, 0, address, 12, ipv4.length);
@ -246,6 +253,8 @@ final class Native {
private static native int iovMax();
private static native int uioMaxIov();
private Native() {
// utility
}

View File

@ -0,0 +1,158 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.concurrent.FastThreadLocal;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
/**
* Support <a href="http://linux.die.net/man/2/sendmmsg">sendmmsg(...)</a> on linux with GLIBC 2.14+
*/
final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor {
private static final FastThreadLocal<NativeDatagramPacketArray> ARRAY =
new FastThreadLocal<NativeDatagramPacketArray>() {
@Override
protected NativeDatagramPacketArray initialValue() throws Exception {
return new NativeDatagramPacketArray();
}
@Override
protected void onRemoval(NativeDatagramPacketArray value) throws Exception {
NativeDatagramPacket[] array = value.packets;
// Release all packets
for (int i = 0; i < array.length; i++) {
array[i].release();
}
}
};
// Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call.
private final NativeDatagramPacket[] packets = new NativeDatagramPacket[Native.UIO_MAX_IOV];
private int count;
private NativeDatagramPacketArray() {
for (int i = 0; i < packets.length; i++) {
packets[i] = new NativeDatagramPacket();
}
}
/**
* Try to add the given {@link DatagramPacket}. Returns {@code true} on success,
* {@code false} otherwise.
*/
boolean add(DatagramPacket packet) {
if (count == packets.length) {
return false;
}
ByteBuf content = packet.content();
int len = content.readableBytes();
if (len == 0) {
return true;
}
NativeDatagramPacket p = packets[count];
InetSocketAddress recipient = packet.recipient();
if (!p.init(content, recipient)) {
return false;
}
count++;
return true;
}
@Override
public boolean processMessage(Object msg) throws Exception {
return msg instanceof DatagramPacket && add((DatagramPacket) msg);
}
/**
* Returns the count
*/
int count() {
return count;
}
/**
* Returns an array with {@link #count()} {@link NativeDatagramPacket}s filled.
*/
NativeDatagramPacket[] packets() {
return packets;
}
/**
* Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of
* {@link ChannelOutboundBuffer}.
*/
static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception {
NativeDatagramPacketArray array = ARRAY.get();
array.count = 0;
buffer.forEachFlushedMessage(array);
return array;
}
/**
* Used to pass needed data to JNI.
*/
@SuppressWarnings("unused")
static final class NativeDatagramPacket {
// Each NativeDatagramPackets holds a IovArray which is used for gathering writes.
// This is ok as NativeDatagramPacketArray is always obtained via a FastThreadLocal and
// so the memory needed is quite small anyway.
private final IovArray array = new IovArray();
// This is the actual struct iovec*
private long memoryAddress;
private int count;
private byte[] addr;
private int scopeId;
private int port;
private void release() {
array.release();
}
/**
* Init this instance and return {@code true} if the init was successful.
*/
private boolean init(ByteBuf buf, InetSocketAddress recipient) {
array.clear();
if (!array.add(buf)) {
return false;
}
// always start from offset 0
memoryAddress = array.memoryAddress(0);
count = array.count();
InetAddress address = recipient.getAddress();
if (address instanceof Inet6Address) {
addr = address.getAddress();
scopeId = ((Inet6Address) address).getScopeId();
} else {
addr = Native.ipv4MappedIpv6Address(address.getAddress());
scopeId = 0;
}
port = recipient.getPort();
return true;
}
}
}