Add support for sendmmsg(...) and so allow to write multiple DatagramPackets with one syscall. Related to [#2719]
Motivation: On linux with glibc >= 2.14 it is possible to send multiple DatagramPackets with one syscall. This can be a huge performance win and so we should support it in our native transport. Modification: - Add support for sendmmsg by reuse IovArray - Factor out ThreadLocal support of IovArray to IovArrayThreadLocal for better separation as we use IovArray also without ThreadLocal in NativeDatagramPacketArray now - Introduce NativeDatagramPacketArray which is used for sendmmsg(...) - Implement sendmmsg(...) via jni - Expand DatagramUnicastTest to test also sendmmsg(...) Result: Netty now automatically use sendmmsg(...) if it is supported and we have more then 1 DatagramPacket in the ChannelOutboundBuffer and flush() is called.
This commit is contained in:
parent
f1f14f524a
commit
d5b9f58f1f
@ -39,7 +39,8 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
}
|
||||
|
||||
public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer());
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 1);
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -48,7 +49,8 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
}
|
||||
|
||||
public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer());
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 1);
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer(), true, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -60,7 +62,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||
buf.addComponent(Unpooled.directBuffer(2, 2));
|
||||
buf.addComponent(Unpooled.directBuffer(2, 2));
|
||||
testSimpleSend0(sb, cb, buf);
|
||||
testSimpleSend0(sb, cb, buf, true, 1);
|
||||
|
||||
CompositeByteBuf buf2 = Unpooled.compositeBuffer();
|
||||
buf2.addComponent(Unpooled.directBuffer(2, 2));
|
||||
buf2.addComponent(Unpooled.directBuffer(2, 2));
|
||||
testSimpleSend0(sb, cb, buf2, true, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -72,7 +79,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||
buf.addComponent(Unpooled.buffer(2, 2));
|
||||
buf.addComponent(Unpooled.buffer(2, 2));
|
||||
testSimpleSend0(sb, cb, buf);
|
||||
testSimpleSend0(sb, cb, buf, true, 1);
|
||||
|
||||
CompositeByteBuf buf2 = Unpooled.compositeBuffer();
|
||||
buf2.addComponent(Unpooled.buffer(2, 2));
|
||||
buf2.addComponent(Unpooled.buffer(2, 2));
|
||||
testSimpleSend0(sb, cb, buf2, true, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -84,36 +96,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||
buf.addComponent(Unpooled.directBuffer(2, 2));
|
||||
buf.addComponent(Unpooled.buffer(2, 2));
|
||||
testSimpleSend0(sb, cb, buf);
|
||||
}
|
||||
testSimpleSend0(sb, cb, buf, true, 1);
|
||||
|
||||
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf) throws Throwable {
|
||||
buf.writeInt(1);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
|
||||
assertEquals(1, msg.content().readInt());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
cb.handler(new SimpleChannelInboundHandler<Object>() {
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, Object msgs) throws Exception {
|
||||
// Nothing will be sent.
|
||||
}
|
||||
});
|
||||
|
||||
Channel sc = sb.bind().sync().channel();
|
||||
Channel cc = cb.bind().sync().channel();
|
||||
|
||||
cc.writeAndFlush(new DatagramPacket(buf, addr)).sync();
|
||||
assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||
|
||||
sc.close().sync();
|
||||
cc.close().sync();
|
||||
CompositeByteBuf buf2 = Unpooled.compositeBuffer();
|
||||
buf2.addComponent(Unpooled.directBuffer(2, 2));
|
||||
buf2.addComponent(Unpooled.buffer(2, 2));
|
||||
testSimpleSend0(sb, cb, buf2, true, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -121,9 +109,16 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
run();
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer(), false, 1);
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer(), false, 4);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient, int count)
|
||||
throws Throwable {
|
||||
buf.writeInt(1);
|
||||
final CountDownLatch latch = new CountDownLatch(count);
|
||||
|
||||
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
|
||||
@Override
|
||||
@ -139,12 +134,22 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
// Nothing will be sent.
|
||||
}
|
||||
});
|
||||
cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
|
||||
|
||||
Channel sc = sb.bind().sync().channel();
|
||||
Channel cc = cb.register().sync().channel();
|
||||
Channel cc;
|
||||
if (bindClient) {
|
||||
cc = cb.bind().sync().channel();
|
||||
} else {
|
||||
cb.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
|
||||
cc = cb.register().sync().channel();
|
||||
}
|
||||
|
||||
cc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), addr)).sync();
|
||||
for (int i = 0; i < count; i++) {
|
||||
cc.write(new DatagramPacket(buf.retain().duplicate(), addr));
|
||||
}
|
||||
// release as we used buf.retain() before
|
||||
buf.release();
|
||||
cc.flush();
|
||||
assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||
|
||||
sc.close().sync();
|
||||
|
@ -34,6 +34,15 @@
|
||||
// optional
|
||||
extern int accept4(int sockFd, struct sockaddr *addr, socklen_t *addrlen, int flags) __attribute__((weak));
|
||||
extern int epoll_create1(int flags) __attribute__((weak));
|
||||
extern int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, unsigned int flags) __attribute__((weak));
|
||||
|
||||
// Just define it here and NOT use #define _GNU_SOURCE as we also want to be able to build on systems that not support
|
||||
// sendmmsg yet. The problem is if we use _GNU_SOURCE we will not be able to declare sendmmsg as extern
|
||||
struct mmsghdr {
|
||||
struct msghdr msg_hdr; /* Message header */
|
||||
unsigned int msg_len; /* Number of bytes transmitted */
|
||||
};
|
||||
|
||||
|
||||
// Those are initialized in the init(...) method and cached for performance reasons
|
||||
jmethodID updatePosId = NULL;
|
||||
@ -44,7 +53,14 @@ jfieldID limitFieldId = NULL;
|
||||
jfieldID fileChannelFieldId = NULL;
|
||||
jfieldID transferedFieldId = NULL;
|
||||
jfieldID fdFieldId = NULL;
|
||||
jfieldID fileDescriptorFieldId = NULL;;
|
||||
jfieldID fileDescriptorFieldId = NULL;
|
||||
|
||||
jfieldID packetAddrFieldId = NULL;
|
||||
jfieldID packetScopeIdFieldId = NULL;
|
||||
jfieldID packetPortFieldId = NULL;
|
||||
jfieldID packetMemoryAddressFieldId = NULL;
|
||||
jfieldID packetCountFieldId = NULL;
|
||||
|
||||
jmethodID inetSocketAddrMethodId = NULL;
|
||||
jmethodID datagramSocketAddrMethodId = NULL;
|
||||
jclass runtimeExceptionClass = NULL;
|
||||
@ -53,6 +69,7 @@ jclass closedChannelExceptionClass = NULL;
|
||||
jmethodID closedChannelExceptionMethodId = NULL;
|
||||
jclass inetSocketAddressClass = NULL;
|
||||
jclass datagramSocketAddressClass = NULL;
|
||||
jclass nativeDatagramPacketClass = NULL;
|
||||
|
||||
static int socketType;
|
||||
static const char *ip4prefix = "::ffff:";
|
||||
@ -414,6 +431,38 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
|
||||
throwRuntimeException(env, "Unable to obtain constructor of DatagramSocketAddress");
|
||||
return JNI_ERR;
|
||||
}
|
||||
jclass nativeDatagramPacketCls = (*env)->FindClass(env, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket");
|
||||
if (nativeDatagramPacketCls == NULL) {
|
||||
// pending exception...
|
||||
return JNI_ERR;
|
||||
}
|
||||
|
||||
packetAddrFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "addr", "[B");
|
||||
if (packetAddrFieldId == NULL) {
|
||||
throwRuntimeException(env, "Unable to obtain addr field for NativeDatagramPacket");
|
||||
return JNI_ERR;
|
||||
}
|
||||
packetScopeIdFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "scopeId", "I");
|
||||
if (packetScopeIdFieldId == NULL) {
|
||||
throwRuntimeException(env, "Unable to obtain scopeId field for NativeDatagramPacket");
|
||||
return JNI_ERR;
|
||||
}
|
||||
packetPortFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "port", "I");
|
||||
if (packetPortFieldId == NULL) {
|
||||
throwRuntimeException(env, "Unable to obtain port field for NativeDatagramPacket");
|
||||
return JNI_ERR;
|
||||
}
|
||||
packetMemoryAddressFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "memoryAddress", "J");
|
||||
if (packetMemoryAddressFieldId == NULL) {
|
||||
throwRuntimeException(env, "Unable to obtain memoryAddress field for NativeDatagramPacket");
|
||||
return JNI_ERR;
|
||||
}
|
||||
|
||||
packetCountFieldId = (*env)->GetFieldID(env, nativeDatagramPacketCls, "count", "I");
|
||||
if (packetCountFieldId == NULL) {
|
||||
throwRuntimeException(env, "Unable to obtain count field for NativeDatagramPacket");
|
||||
return JNI_ERR;
|
||||
}
|
||||
return JNI_VERSION_1_6;
|
||||
}
|
||||
}
|
||||
@ -690,6 +739,53 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv
|
||||
return (jint) res;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendmmsg(JNIEnv * env, jclass clazz, jint fd, jobjectArray packets, jint offset, jint len) {
|
||||
struct mmsghdr msg[len];
|
||||
int i;
|
||||
|
||||
memset(msg, 0, sizeof(msg));
|
||||
|
||||
for (i = 0; i < len; i++) {
|
||||
struct sockaddr_storage addr;
|
||||
|
||||
jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset);
|
||||
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);
|
||||
jint scopeId = (*env)->GetIntField(env, packet, packetScopeIdFieldId);
|
||||
jint port = (*env)->GetIntField(env, packet, packetPortFieldId);
|
||||
|
||||
if (init_sockaddr(env, address, scopeId, port, &addr) == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
msg[i].msg_hdr.msg_name = &addr;
|
||||
msg[i].msg_hdr.msg_namelen = sizeof(addr);
|
||||
|
||||
msg[i].msg_hdr.msg_iov = (struct iovec *) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId);
|
||||
msg[i].msg_hdr.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId);;
|
||||
}
|
||||
|
||||
ssize_t res;
|
||||
int err;
|
||||
do {
|
||||
res = sendmmsg(fd, msg, len, 0);
|
||||
// keep on writing if it was interrupted
|
||||
} while(res == -1 && ((err = errno) == EINTR));
|
||||
|
||||
if (res < 0) {
|
||||
// network stack saturated... try again later
|
||||
if (err == EAGAIN || err == EWOULDBLOCK) {
|
||||
return 0;
|
||||
}
|
||||
if (err == EBADF) {
|
||||
throwClosedChannelException(env);
|
||||
return -1;
|
||||
}
|
||||
throwIOException(env, exceptionMessage("Error while sendmmsg(...): ", err));
|
||||
return -1;
|
||||
}
|
||||
return (jint) res;
|
||||
}
|
||||
|
||||
jobject recvFrom0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit) {
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t addrlen = sizeof(addr);
|
||||
@ -1226,3 +1322,16 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEn
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz) {
|
||||
return IOV_MAX;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_uioMaxIov(JNIEnv *env, jclass clazz) {
|
||||
return UIO_MAXIOV;
|
||||
}
|
||||
|
||||
|
||||
JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv *env, jclass clazz) {
|
||||
if (sendmmsg) {
|
||||
return JNI_TRUE;
|
||||
}
|
||||
return JNI_FALSE;
|
||||
}
|
||||
|
||||
|
@ -33,6 +33,11 @@
|
||||
#define IOV_MAX 1024
|
||||
#endif /* IOV_MAX */
|
||||
|
||||
// Define UIO_MAXIOV if not found
|
||||
#ifndef UIO_MAXIOV
|
||||
#define UIO_MAXIOV 1024
|
||||
#endif /* UIO_MAXIOV */
|
||||
|
||||
jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz);
|
||||
void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value);
|
||||
void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd);
|
||||
@ -48,6 +53,7 @@ jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass cl
|
||||
jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
|
||||
jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
|
||||
jint Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port);
|
||||
jint Java_io_netty_channel_epoll_Native_sendmmsg(JNIEnv * env, jclass clazz, jint fd, jobjectArray packets, jint offset, jint len);
|
||||
|
||||
jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
|
||||
jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
|
||||
@ -95,3 +101,5 @@ jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv *env, jclass clazz,
|
||||
|
||||
jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz);
|
||||
jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv *env, jclass clazz);
|
||||
jint Java_io_netty_channel_epoll_Native_uioMaxIov(JNIEnv *env, jclass clazz);
|
||||
jboolean Java_io_netty_channel_epoll_Native_isSupportingSendmmsg(JNIEnv *env, jclass clazz);
|
||||
|
@ -266,6 +266,32 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
}
|
||||
|
||||
try {
|
||||
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
|
||||
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
|
||||
NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
|
||||
int cnt = array.count();
|
||||
|
||||
if (cnt >= 1) {
|
||||
// Try to use gathering writes via sendmmsg(...) syscall.
|
||||
int offset = 0;
|
||||
NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
|
||||
|
||||
while (cnt > 0) {
|
||||
int send = Native.sendmmsg(fd, packets, offset, cnt);
|
||||
if (send == 0) {
|
||||
// Did not write all messages.
|
||||
setEpollOut();
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < send; i++) {
|
||||
in.remove();
|
||||
}
|
||||
cnt -= send;
|
||||
offset += send;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
boolean done = false;
|
||||
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
|
||||
if (doWriteMessage(msg)) {
|
||||
@ -322,7 +348,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(),
|
||||
remoteAddress.getAddress(), remoteAddress.getPort());
|
||||
} else if (data instanceof CompositeByteBuf) {
|
||||
IovArray array = IovArray.get((CompositeByteBuf) data);
|
||||
IovArray array = IovArrayThreadLocal.get((CompositeByteBuf) data);
|
||||
int cnt = array.count();
|
||||
assert cnt != 0;
|
||||
|
||||
|
@ -358,7 +358,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
|
||||
if (PlatformDependent.hasUnsafe()) {
|
||||
// this means we can cast to IovArray and write the IovArray directly.
|
||||
IovArray array = IovArray.get(in);
|
||||
IovArray array = IovArrayThreadLocal.get(in);
|
||||
int cnt = array.count();
|
||||
if (cnt >= 1) {
|
||||
// TODO: Handle the case where cnt == 1 specially.
|
||||
|
@ -17,9 +17,7 @@ package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
|
||||
import io.netty.util.concurrent.FastThreadLocal;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
@ -52,37 +50,30 @@ final class IovArray implements MessageProcessor {
|
||||
*/
|
||||
private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
|
||||
|
||||
/** The needed memory to hold up to {@link Native#IOV_MAX} iov entries, where {@link Native#IOV_MAX} signified
|
||||
/**
|
||||
* The needed memory to hold up to {@link Native#IOV_MAX} iov entries, where {@link Native#IOV_MAX} signified
|
||||
* the maximum number of {@code iovec} structs that can be passed to {@code writev(...)}.
|
||||
*/
|
||||
private static final int CAPACITY = Native.IOV_MAX * IOV_SIZE;
|
||||
|
||||
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
|
||||
@Override
|
||||
protected IovArray initialValue() throws Exception {
|
||||
return new IovArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onRemoval(IovArray value) throws Exception {
|
||||
// free the direct memory now
|
||||
PlatformDependent.freeMemory(value.memoryAddress);
|
||||
}
|
||||
};
|
||||
|
||||
private final long memoryAddress;
|
||||
private int count;
|
||||
private long size;
|
||||
|
||||
private IovArray() {
|
||||
IovArray() {
|
||||
memoryAddress = PlatformDependent.allocateMemory(CAPACITY);
|
||||
}
|
||||
|
||||
void clear() {
|
||||
count = 0;
|
||||
size = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to add the given {@link ByteBuf}. Returns {@code true} on success,
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
private boolean add(ByteBuf buf) {
|
||||
boolean add(ByteBuf buf) {
|
||||
if (count == Native.IOV_MAX) {
|
||||
// No more room!
|
||||
return false;
|
||||
@ -124,7 +115,11 @@ final class IovArray implements MessageProcessor {
|
||||
size += len;
|
||||
}
|
||||
|
||||
private boolean add(CompositeByteBuf buf) {
|
||||
/**
|
||||
* Try to add the given {@link CompositeByteBuf}. Returns {@code true} on success,
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
boolean add(CompositeByteBuf buf) {
|
||||
ByteBuffer[] buffers = buf.nioBuffers();
|
||||
if (count + buffers.length >= Native.IOV_MAX) {
|
||||
// No more room!
|
||||
@ -196,6 +191,13 @@ final class IovArray implements MessageProcessor {
|
||||
return memoryAddress + IOV_SIZE * offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the {@link IovArray}. Once release further using of it may crash the JVM!
|
||||
*/
|
||||
void release() {
|
||||
PlatformDependent.freeMemory(memoryAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processMessage(Object msg) throws Exception {
|
||||
if (msg instanceof ByteBuf) {
|
||||
@ -207,23 +209,4 @@ final class IovArray implements MessageProcessor {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}.
|
||||
*/
|
||||
static IovArray get(ChannelOutboundBuffer buffer) throws Exception {
|
||||
IovArray array = ARRAY.get();
|
||||
array.size = 0;
|
||||
array.count = 0;
|
||||
buffer.forEachFlushedMessage(array);
|
||||
return array;
|
||||
}
|
||||
|
||||
static IovArray get(CompositeByteBuf buf) throws Exception {
|
||||
IovArray array = ARRAY.get();
|
||||
array.size = 0;
|
||||
array.count = 0;
|
||||
array.processMessage(buf);
|
||||
return array;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.util.concurrent.FastThreadLocal;
|
||||
|
||||
/**
|
||||
* Allow to obtain {@link IovArray} instances.
|
||||
*/
|
||||
final class IovArrayThreadLocal {
|
||||
|
||||
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
|
||||
@Override
|
||||
protected IovArray initialValue() throws Exception {
|
||||
return new IovArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onRemoval(IovArray value) throws Exception {
|
||||
// free the direct memory now
|
||||
value.release();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}.
|
||||
*/
|
||||
static IovArray get(ChannelOutboundBuffer buffer) throws Exception {
|
||||
IovArray array = ARRAY.get();
|
||||
array.clear();
|
||||
buffer.forEachFlushedMessage(array);
|
||||
return array;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link IovArray} which is filled with the {@link CompositeByteBuf}.
|
||||
*/
|
||||
static IovArray get(CompositeByteBuf buf) throws Exception {
|
||||
IovArray array = ARRAY.get();
|
||||
array.clear();
|
||||
array.add(buf);
|
||||
return array;
|
||||
}
|
||||
|
||||
private IovArrayThreadLocal() { }
|
||||
}
|
@ -51,7 +51,9 @@ final class Native {
|
||||
public static final int EPOLLOUT = 0x02;
|
||||
public static final int EPOLLACCEPT = 0x04;
|
||||
public static final int EPOLLRDHUP = 0x08;
|
||||
public static int IOV_MAX = iovMax();
|
||||
public static final int IOV_MAX = iovMax();
|
||||
public static final int UIO_MAX_IOV = uioMaxIov();
|
||||
public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();
|
||||
|
||||
public static native int eventFd();
|
||||
public static native void eventFdWrite(int fd, long value);
|
||||
@ -144,6 +146,11 @@ final class Native {
|
||||
public static native EpollDatagramChannel.DatagramSocketAddress recvFromAddress(
|
||||
int fd, long memoryAddress, int pos, int limit) throws IOException;
|
||||
|
||||
public static native int sendmmsg(
|
||||
int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len) throws IOException;
|
||||
|
||||
private static native boolean isSupportingSendmmsg();
|
||||
|
||||
// socket operations
|
||||
public static int socketStreamFd() {
|
||||
try {
|
||||
@ -168,7 +175,7 @@ final class Native {
|
||||
bind(fd, address.address, address.scopeId, port);
|
||||
}
|
||||
|
||||
private static byte[] ipv4MappedIpv6Address(byte[] ipv4) {
|
||||
static byte[] ipv4MappedIpv6Address(byte[] ipv4) {
|
||||
byte[] address = new byte[16];
|
||||
System.arraycopy(IPV4_MAPPED_IPV6_PREFIX, 0, address, 0, IPV4_MAPPED_IPV6_PREFIX.length);
|
||||
System.arraycopy(ipv4, 0, address, 12, ipv4.length);
|
||||
@ -246,6 +253,8 @@ final class Native {
|
||||
|
||||
private static native int iovMax();
|
||||
|
||||
private static native int uioMaxIov();
|
||||
|
||||
private Native() {
|
||||
// utility
|
||||
}
|
||||
|
@ -0,0 +1,158 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.util.concurrent.FastThreadLocal;
|
||||
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* Support <a href="http://linux.die.net/man/2/sendmmsg">sendmmsg(...)</a> on linux with GLIBC 2.14+
|
||||
*/
|
||||
final class NativeDatagramPacketArray implements ChannelOutboundBuffer.MessageProcessor {
|
||||
|
||||
private static final FastThreadLocal<NativeDatagramPacketArray> ARRAY =
|
||||
new FastThreadLocal<NativeDatagramPacketArray>() {
|
||||
@Override
|
||||
protected NativeDatagramPacketArray initialValue() throws Exception {
|
||||
return new NativeDatagramPacketArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onRemoval(NativeDatagramPacketArray value) throws Exception {
|
||||
NativeDatagramPacket[] array = value.packets;
|
||||
// Release all packets
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
array[i].release();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Use UIO_MAX_IOV as this is the maximum number we can write with one sendmmsg(...) call.
|
||||
private final NativeDatagramPacket[] packets = new NativeDatagramPacket[Native.UIO_MAX_IOV];
|
||||
private int count;
|
||||
|
||||
private NativeDatagramPacketArray() {
|
||||
for (int i = 0; i < packets.length; i++) {
|
||||
packets[i] = new NativeDatagramPacket();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to add the given {@link DatagramPacket}. Returns {@code true} on success,
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
boolean add(DatagramPacket packet) {
|
||||
if (count == packets.length) {
|
||||
return false;
|
||||
}
|
||||
ByteBuf content = packet.content();
|
||||
int len = content.readableBytes();
|
||||
if (len == 0) {
|
||||
return true;
|
||||
}
|
||||
NativeDatagramPacket p = packets[count];
|
||||
InetSocketAddress recipient = packet.recipient();
|
||||
if (!p.init(content, recipient)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
count++;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processMessage(Object msg) throws Exception {
|
||||
return msg instanceof DatagramPacket && add((DatagramPacket) msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the count
|
||||
*/
|
||||
int count() {
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an array with {@link #count()} {@link NativeDatagramPacket}s filled.
|
||||
*/
|
||||
NativeDatagramPacket[] packets() {
|
||||
return packets;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of
|
||||
* {@link ChannelOutboundBuffer}.
|
||||
*/
|
||||
static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception {
|
||||
NativeDatagramPacketArray array = ARRAY.get();
|
||||
array.count = 0;
|
||||
buffer.forEachFlushedMessage(array);
|
||||
return array;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to pass needed data to JNI.
|
||||
*/
|
||||
@SuppressWarnings("unused")
|
||||
static final class NativeDatagramPacket {
|
||||
// Each NativeDatagramPackets holds a IovArray which is used for gathering writes.
|
||||
// This is ok as NativeDatagramPacketArray is always obtained via a FastThreadLocal and
|
||||
// so the memory needed is quite small anyway.
|
||||
private final IovArray array = new IovArray();
|
||||
|
||||
// This is the actual struct iovec*
|
||||
private long memoryAddress;
|
||||
private int count;
|
||||
|
||||
private byte[] addr;
|
||||
private int scopeId;
|
||||
private int port;
|
||||
|
||||
private void release() {
|
||||
array.release();
|
||||
}
|
||||
|
||||
/**
|
||||
* Init this instance and return {@code true} if the init was successful.
|
||||
*/
|
||||
private boolean init(ByteBuf buf, InetSocketAddress recipient) {
|
||||
array.clear();
|
||||
if (!array.add(buf)) {
|
||||
return false;
|
||||
}
|
||||
// always start from offset 0
|
||||
memoryAddress = array.memoryAddress(0);
|
||||
count = array.count();
|
||||
|
||||
InetAddress address = recipient.getAddress();
|
||||
if (address instanceof Inet6Address) {
|
||||
addr = address.getAddress();
|
||||
scopeId = ((Inet6Address) address).getScopeId();
|
||||
} else {
|
||||
addr = Native.ipv4MappedIpv6Address(address.getAddress());
|
||||
scopeId = 0;
|
||||
}
|
||||
port = recipient.getPort();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user