Allow to write CompositeByteBuf directly via EpollDatagramChannel. Related to [#2719]
Motivation: On linux it is possible to use the sendMsg(...) system call to write multiple buffers with one system call when using datagram/udp. Modifications: - Implement the needed changes and make use of sendMsg(...) if possible for max performance - Add tests that test sending datagram packets with all kind of different ByteBuf implementations. Result: Performance improvement when using CompoisteByteBuf and EpollDatagramChannel.
This commit is contained in:
parent
6d1b96fb63
commit
07876ebf68
@ -16,6 +16,8 @@
|
|||||||
package io.netty.testsuite.transport.socket;
|
package io.netty.testsuite.transport.socket;
|
||||||
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
@ -32,11 +34,61 @@ import static org.junit.Assert.*;
|
|||||||
public class DatagramUnicastTest extends AbstractDatagramTest {
|
public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleSend() throws Throwable {
|
public void testSimpleSendDirectByteBuf() throws Throwable {
|
||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSimpleSend(Bootstrap sb, Bootstrap cb) throws Throwable {
|
public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
|
testSimpleSend0(sb, cb, Unpooled.directBuffer());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleSendHeapByteBuf() throws Throwable {
|
||||||
|
run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
|
testSimpleSend0(sb, cb, Unpooled.directBuffer());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleSendCompositeDirectByteBuf() throws Throwable {
|
||||||
|
run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
|
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||||
|
buf.addComponent(Unpooled.directBuffer(2, 2));
|
||||||
|
buf.addComponent(Unpooled.directBuffer(2, 2));
|
||||||
|
testSimpleSend0(sb, cb, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleSendCompositeHeapByteBuf() throws Throwable {
|
||||||
|
run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
|
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||||
|
buf.addComponent(Unpooled.buffer(2, 2));
|
||||||
|
buf.addComponent(Unpooled.buffer(2, 2));
|
||||||
|
testSimpleSend0(sb, cb, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleSendCompositeMixedByteBuf() throws Throwable {
|
||||||
|
run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
|
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||||
|
buf.addComponent(Unpooled.directBuffer(2, 2));
|
||||||
|
buf.addComponent(Unpooled.buffer(2, 2));
|
||||||
|
testSimpleSend0(sb, cb, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf) throws Throwable {
|
||||||
|
buf.writeInt(1);
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
|
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
|
||||||
@ -57,7 +109,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
|||||||
Channel sc = sb.bind().sync().channel();
|
Channel sc = sb.bind().sync().channel();
|
||||||
Channel cc = cb.bind().sync().channel();
|
Channel cc = cb.bind().sync().channel();
|
||||||
|
|
||||||
cc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), addr)).sync();
|
cc.writeAndFlush(new DatagramPacket(buf, addr)).sync();
|
||||||
assertTrue(latch.await(10, TimeUnit.SECONDS));
|
assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
sc.close().sync();
|
sc.close().sync();
|
||||||
|
@ -655,6 +655,41 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv *
|
|||||||
return sendTo0(env, fd, (void*) memoryAddress, pos, limit, address, scopeId, port);
|
return sendTo0(env, fd, (void*) memoryAddress, pos, limit, address, scopeId, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) {
|
||||||
|
struct sockaddr_storage addr;
|
||||||
|
|
||||||
|
if (init_sockaddr(env, address, scopeId, port, &addr) == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct msghdr m;
|
||||||
|
m.msg_name = (void*) &addr;
|
||||||
|
m.msg_namelen = (socklen_t) sizeof(struct sockaddr_storage);
|
||||||
|
m.msg_iov = (struct iovec *) memoryAddress;
|
||||||
|
m.msg_iovlen = length;
|
||||||
|
|
||||||
|
ssize_t res;
|
||||||
|
int err;
|
||||||
|
do {
|
||||||
|
res = sendmsg(fd, &m, 0);
|
||||||
|
// keep on writing if it was interrupted
|
||||||
|
} while(res == -1 && ((err = errno) == EINTR));
|
||||||
|
|
||||||
|
if (res < 0) {
|
||||||
|
// network stack saturated... try again later
|
||||||
|
if (err == EAGAIN || err == EWOULDBLOCK) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (err == EBADF) {
|
||||||
|
throwClosedChannelException(env);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
throwIOException(env, exceptionMessage("Error while sendto(...): ", err));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return (jint) res;
|
||||||
|
}
|
||||||
|
|
||||||
jobject recvFrom0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit) {
|
jobject recvFrom0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit) {
|
||||||
struct sockaddr_storage addr;
|
struct sockaddr_storage addr;
|
||||||
socklen_t addrlen = sizeof(addr);
|
socklen_t addrlen = sizeof(addr);
|
||||||
|
@ -47,6 +47,7 @@ jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint
|
|||||||
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length);
|
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length);
|
||||||
jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
|
jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
|
||||||
jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
|
jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
|
||||||
|
jint Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port);
|
||||||
|
|
||||||
jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
|
jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
|
||||||
jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
|
jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
package io.netty.channel.epoll;
|
package io.netty.channel.epoll;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
import io.netty.channel.AddressedEnvelope;
|
import io.netty.channel.AddressedEnvelope;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelMetadata;
|
import io.netty.channel.ChannelMetadata;
|
||||||
@ -28,6 +29,7 @@ import io.netty.channel.RecvByteBufAllocator;
|
|||||||
import io.netty.channel.socket.DatagramChannel;
|
import io.netty.channel.socket.DatagramChannel;
|
||||||
import io.netty.channel.socket.DatagramChannelConfig;
|
import io.netty.channel.socket.DatagramChannelConfig;
|
||||||
import io.netty.channel.socket.DatagramPacket;
|
import io.netty.channel.socket.DatagramPacket;
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -288,7 +290,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean doWriteMessage(Object msg) throws IOException {
|
private boolean doWriteMessage(Object msg) throws Exception {
|
||||||
final ByteBuf data;
|
final ByteBuf data;
|
||||||
InetSocketAddress remoteAddress;
|
InetSocketAddress remoteAddress;
|
||||||
if (msg instanceof AddressedEnvelope) {
|
if (msg instanceof AddressedEnvelope) {
|
||||||
@ -319,6 +321,13 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
long memoryAddress = data.memoryAddress();
|
long memoryAddress = data.memoryAddress();
|
||||||
writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(),
|
writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(),
|
||||||
remoteAddress.getAddress(), remoteAddress.getPort());
|
remoteAddress.getAddress(), remoteAddress.getPort());
|
||||||
|
} else if (data instanceof CompositeByteBuf) {
|
||||||
|
IovArray array = IovArray.get((CompositeByteBuf) data);
|
||||||
|
int cnt = array.count();
|
||||||
|
assert cnt != 0;
|
||||||
|
|
||||||
|
writtenBytes = Native.sendToAddresses(fd, array.memoryAddress(0),
|
||||||
|
cnt, remoteAddress.getAddress(), remoteAddress.getPort());
|
||||||
} else {
|
} else {
|
||||||
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
|
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
|
||||||
writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(),
|
writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(),
|
||||||
@ -344,13 +353,24 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
|
|
||||||
if (msg instanceof ByteBuf) {
|
if (msg instanceof ByteBuf) {
|
||||||
ByteBuf buf = (ByteBuf) msg;
|
ByteBuf buf = (ByteBuf) msg;
|
||||||
if (buf.hasMemoryAddress()) {
|
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
|
||||||
return msg;
|
if (buf instanceof CompositeByteBuf) {
|
||||||
|
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||||
|
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||||
|
CompositeByteBuf comp = (CompositeByteBuf) buf;
|
||||||
|
if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) {
|
||||||
|
// more then 1024 buffers for gathering writes so just do a memory copy.
|
||||||
|
buf = newDirectBuffer(buf);
|
||||||
|
assert buf.hasMemoryAddress();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// We can only handle buffers with memory address so we need to copy if a non direct is
|
||||||
|
// passed to write.
|
||||||
|
buf = newDirectBuffer(buf);
|
||||||
|
assert buf.hasMemoryAddress();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return buf;
|
||||||
// We can only handle direct buffers so we need to copy if a non direct is
|
|
||||||
// passed to write.
|
|
||||||
return newDirectBuffer(buf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msg instanceof AddressedEnvelope) {
|
if (msg instanceof AddressedEnvelope) {
|
||||||
@ -363,7 +383,14 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
if (content.hasMemoryAddress()) {
|
if (content.hasMemoryAddress()) {
|
||||||
return e;
|
return e;
|
||||||
}
|
}
|
||||||
|
if (content instanceof CompositeByteBuf) {
|
||||||
|
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||||
|
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||||
|
CompositeByteBuf comp = (CompositeByteBuf) content;
|
||||||
|
if (comp.isDirect() && comp.nioBufferCount() <= Native.IOV_MAX) {
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
}
|
||||||
// We can only handle direct buffers so we need to copy if a non direct is
|
// We can only handle direct buffers so we need to copy if a non direct is
|
||||||
// passed to write.
|
// passed to write.
|
||||||
return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
|
return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
|
||||||
|
@ -218,4 +218,12 @@ final class IovArray implements MessageProcessor {
|
|||||||
buffer.forEachFlushedMessage(array);
|
buffer.forEachFlushedMessage(array);
|
||||||
return array;
|
return array;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static IovArray get(CompositeByteBuf buf) throws Exception {
|
||||||
|
IovArray array = ARRAY.get();
|
||||||
|
array.size = 0;
|
||||||
|
array.count = 0;
|
||||||
|
array.processMessage(buf);
|
||||||
|
return array;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -118,6 +118,26 @@ final class Native {
|
|||||||
private static native int sendToAddress(
|
private static native int sendToAddress(
|
||||||
int fd, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port) throws IOException;
|
int fd, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port) throws IOException;
|
||||||
|
|
||||||
|
public static int sendToAddresses(
|
||||||
|
int fd, long memoryAddress, int length, InetAddress addr, int port) throws IOException {
|
||||||
|
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
|
||||||
|
// to be called frequently
|
||||||
|
byte[] address;
|
||||||
|
int scopeId;
|
||||||
|
if (addr instanceof Inet6Address) {
|
||||||
|
address = addr.getAddress();
|
||||||
|
scopeId = ((Inet6Address) addr).getScopeId();
|
||||||
|
} else {
|
||||||
|
// convert to ipv4 mapped ipv6 address;
|
||||||
|
scopeId = 0;
|
||||||
|
address = ipv4MappedIpv6Address(addr.getAddress());
|
||||||
|
}
|
||||||
|
return sendToAddresses(fd, memoryAddress, length, address, scopeId, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static native int sendToAddresses(
|
||||||
|
int fd, long memoryAddress, int length, byte[] address, int scopeId, int port) throws IOException;
|
||||||
|
|
||||||
public static native EpollDatagramChannel.DatagramSocketAddress recvFrom(
|
public static native EpollDatagramChannel.DatagramSocketAddress recvFrom(
|
||||||
int fd, ByteBuffer buf, int pos, int limit) throws IOException;
|
int fd, ByteBuffer buf, int pos, int limit) throws IOException;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user