Allow to write CompositeByteBuf directly via EpollDatagramChannel. Related to [#2719]
Motivation: On linux it is possible to use the sendMsg(...) system call to write multiple buffers with one system call when using datagram/udp. Modifications: - Implement the needed changes and make use of sendMsg(...) if possible for max performance - Add tests that test sending datagram packets with all kind of different ByteBuf implementations. Result: Performance improvement when using CompoisteByteBuf and EpollDatagramChannel.
This commit is contained in:
parent
cb1bf1a74e
commit
646753c2eb
@ -16,6 +16,8 @@
|
||||
package io.netty.testsuite.transport.socket;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
@ -32,11 +34,61 @@ import static org.junit.Assert.*;
|
||||
public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
|
||||
@Test
|
||||
public void testSimpleSend() throws Throwable {
|
||||
public void testSimpleSendDirectByteBuf() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testSimpleSend(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleSendHeapByteBuf() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleSendCompositeDirectByteBuf() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||
buf.addComponent(Unpooled.directBuffer(2, 2));
|
||||
buf.addComponent(Unpooled.directBuffer(2, 2));
|
||||
testSimpleSend0(sb, cb, buf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleSendCompositeHeapByteBuf() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||
buf.addComponent(Unpooled.buffer(2, 2));
|
||||
buf.addComponent(Unpooled.buffer(2, 2));
|
||||
testSimpleSend0(sb, cb, buf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleSendCompositeMixedByteBuf() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||
buf.addComponent(Unpooled.directBuffer(2, 2));
|
||||
buf.addComponent(Unpooled.buffer(2, 2));
|
||||
testSimpleSend0(sb, cb, buf);
|
||||
}
|
||||
|
||||
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf) throws Throwable {
|
||||
buf.writeInt(1);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
|
||||
@ -57,7 +109,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
Channel sc = sb.bind().sync().channel();
|
||||
Channel cc = cb.bind().sync().channel();
|
||||
|
||||
cc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), addr)).sync();
|
||||
cc.writeAndFlush(new DatagramPacket(buf, addr)).sync();
|
||||
assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||
|
||||
sc.close().sync();
|
||||
|
@ -655,6 +655,41 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv *
|
||||
return sendTo0(env, fd, (void*) memoryAddress, pos, limit, address, scopeId, port);
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) {
|
||||
struct sockaddr_storage addr;
|
||||
|
||||
if (init_sockaddr(env, address, scopeId, port, &addr) == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct msghdr m;
|
||||
m.msg_name = (void*) &addr;
|
||||
m.msg_namelen = (socklen_t) sizeof(struct sockaddr_storage);
|
||||
m.msg_iov = (struct iovec *) memoryAddress;
|
||||
m.msg_iovlen = length;
|
||||
|
||||
ssize_t res;
|
||||
int err;
|
||||
do {
|
||||
res = sendmsg(fd, &m, 0);
|
||||
// keep on writing if it was interrupted
|
||||
} while(res == -1 && ((err = errno) == EINTR));
|
||||
|
||||
if (res < 0) {
|
||||
// network stack saturated... try again later
|
||||
if (err == EAGAIN || err == EWOULDBLOCK) {
|
||||
return 0;
|
||||
}
|
||||
if (err == EBADF) {
|
||||
throwClosedChannelException(env);
|
||||
return -1;
|
||||
}
|
||||
throwIOException(env, exceptionMessage("Error while sendto(...): ", err));
|
||||
return -1;
|
||||
}
|
||||
return (jint) res;
|
||||
}
|
||||
|
||||
jobject recvFrom0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit) {
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t addrlen = sizeof(addr);
|
||||
|
@ -47,6 +47,7 @@ jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint
|
||||
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length);
|
||||
jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
|
||||
jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
|
||||
jint Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port);
|
||||
|
||||
jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
|
||||
jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
|
||||
|
@ -16,6 +16,7 @@
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.AddressedEnvelope;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
@ -28,6 +29,7 @@ import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.socket.DatagramChannel;
|
||||
import io.netty.channel.socket.DatagramChannelConfig;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -288,7 +290,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
}
|
||||
}
|
||||
|
||||
private boolean doWriteMessage(Object msg) throws IOException {
|
||||
private boolean doWriteMessage(Object msg) throws Exception {
|
||||
final ByteBuf data;
|
||||
InetSocketAddress remoteAddress;
|
||||
if (msg instanceof AddressedEnvelope) {
|
||||
@ -319,6 +321,13 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
long memoryAddress = data.memoryAddress();
|
||||
writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(),
|
||||
remoteAddress.getAddress(), remoteAddress.getPort());
|
||||
} else if (data instanceof CompositeByteBuf) {
|
||||
IovArray array = IovArray.get((CompositeByteBuf) data);
|
||||
int cnt = array.count();
|
||||
assert cnt != 0;
|
||||
|
||||
writtenBytes = Native.sendToAddresses(fd, array.memoryAddress(0),
|
||||
cnt, remoteAddress.getAddress(), remoteAddress.getPort());
|
||||
} else {
|
||||
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
|
||||
writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(),
|
||||
@ -344,13 +353,24 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (buf.hasMemoryAddress()) {
|
||||
return msg;
|
||||
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
|
||||
if (buf instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) buf;
|
||||
if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) {
|
||||
// more then 1024 buffers for gathering writes so just do a memory copy.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
|
||||
// We can only handle direct buffers so we need to copy if a non direct is
|
||||
} else {
|
||||
// We can only handle buffers with memory address so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
return newDirectBuffer(buf);
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
if (msg instanceof AddressedEnvelope) {
|
||||
@ -363,7 +383,14 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
if (content.hasMemoryAddress()) {
|
||||
return e;
|
||||
}
|
||||
|
||||
if (content instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) content;
|
||||
if (comp.isDirect() && comp.nioBufferCount() <= Native.IOV_MAX) {
|
||||
return e;
|
||||
}
|
||||
}
|
||||
// We can only handle direct buffers so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
|
||||
|
@ -218,4 +218,12 @@ final class IovArray implements MessageProcessor {
|
||||
buffer.forEachFlushedMessage(array);
|
||||
return array;
|
||||
}
|
||||
|
||||
static IovArray get(CompositeByteBuf buf) throws Exception {
|
||||
IovArray array = ARRAY.get();
|
||||
array.size = 0;
|
||||
array.count = 0;
|
||||
array.processMessage(buf);
|
||||
return array;
|
||||
}
|
||||
}
|
||||
|
@ -118,6 +118,26 @@ final class Native {
|
||||
private static native int sendToAddress(
|
||||
int fd, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port) throws IOException;
|
||||
|
||||
public static int sendToAddresses(
|
||||
int fd, long memoryAddress, int length, InetAddress addr, int port) throws IOException {
|
||||
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
|
||||
// to be called frequently
|
||||
byte[] address;
|
||||
int scopeId;
|
||||
if (addr instanceof Inet6Address) {
|
||||
address = addr.getAddress();
|
||||
scopeId = ((Inet6Address) addr).getScopeId();
|
||||
} else {
|
||||
// convert to ipv4 mapped ipv6 address;
|
||||
scopeId = 0;
|
||||
address = ipv4MappedIpv6Address(addr.getAddress());
|
||||
}
|
||||
return sendToAddresses(fd, memoryAddress, length, address, scopeId, port);
|
||||
}
|
||||
|
||||
private static native int sendToAddresses(
|
||||
int fd, long memoryAddress, int length, byte[] address, int scopeId, int port) throws IOException;
|
||||
|
||||
public static native EpollDatagramChannel.DatagramSocketAddress recvFrom(
|
||||
int fd, ByteBuffer buf, int pos, int limit) throws IOException;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user