Allow to write CompositeByteBuf directly via EpollDatagramChannel. Related to [#2719]

Motivation:

On linux it is possible to use the sendMsg(...) system call to write multiple buffers with one system call when using datagram/udp.

Modifications:

- Implement the needed changes and make use of sendMsg(...) if possible for max performance
- Add tests that test sending datagram packets with all kind of different ByteBuf implementations.

Result:

Performance improvement when using CompoisteByteBuf and EpollDatagramChannel.
This commit is contained in:
Norman Maurer 2014-08-25 08:47:00 +02:00
parent 7e48801f71
commit f1f14f524a
6 changed files with 154 additions and 11 deletions

View File

@ -16,6 +16,8 @@
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@ -32,11 +34,61 @@ import static org.junit.Assert.*;
public class DatagramUnicastTest extends AbstractDatagramTest {
@Test
public void testSimpleSend() throws Throwable {
public void testSimpleSendDirectByteBuf() throws Throwable {
run();
}
public void testSimpleSend(Bootstrap sb, Bootstrap cb) throws Throwable {
public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
testSimpleSend0(sb, cb, Unpooled.directBuffer());
}
@Test
public void testSimpleSendHeapByteBuf() throws Throwable {
run();
}
public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
testSimpleSend0(sb, cb, Unpooled.directBuffer());
}
@Test
public void testSimpleSendCompositeDirectByteBuf() throws Throwable {
run();
}
public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.directBuffer(2, 2));
buf.addComponent(Unpooled.directBuffer(2, 2));
testSimpleSend0(sb, cb, buf);
}
@Test
public void testSimpleSendCompositeHeapByteBuf() throws Throwable {
run();
}
public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.buffer(2, 2));
buf.addComponent(Unpooled.buffer(2, 2));
testSimpleSend0(sb, cb, buf);
}
@Test
public void testSimpleSendCompositeMixedByteBuf() throws Throwable {
run();
}
public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.directBuffer(2, 2));
buf.addComponent(Unpooled.buffer(2, 2));
testSimpleSend0(sb, cb, buf);
}
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf) throws Throwable {
buf.writeInt(1);
final CountDownLatch latch = new CountDownLatch(1);
sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@ -57,7 +109,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
Channel sc = sb.bind().sync().channel();
Channel cc = cb.bind().sync().channel();
cc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), addr)).sync();
cc.writeAndFlush(new DatagramPacket(buf, addr)).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
sc.close().sync();

View File

@ -655,6 +655,41 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv *
return sendTo0(env, fd, (void*) memoryAddress, pos, limit, address, scopeId, port);
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port) {
struct sockaddr_storage addr;
if (init_sockaddr(env, address, scopeId, port, &addr) == -1) {
return -1;
}
struct msghdr m;
m.msg_name = (void*) &addr;
m.msg_namelen = (socklen_t) sizeof(struct sockaddr_storage);
m.msg_iov = (struct iovec *) memoryAddress;
m.msg_iovlen = length;
ssize_t res;
int err;
do {
res = sendmsg(fd, &m, 0);
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));
if (res < 0) {
// network stack saturated... try again later
if (err == EAGAIN || err == EWOULDBLOCK) {
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while sendto(...): ", err));
return -1;
}
return (jint) res;
}
jobject recvFrom0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);

View File

@ -47,6 +47,7 @@ jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length);
jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
jint Java_io_netty_channel_epoll_Native_sendToAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length, jbyteArray address, jint scopeId, jint port);
jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);

View File

@ -16,6 +16,7 @@
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
@ -28,6 +29,7 @@ import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
@ -288,7 +290,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
}
private boolean doWriteMessage(Object msg) throws IOException {
private boolean doWriteMessage(Object msg) throws Exception {
final ByteBuf data;
InetSocketAddress remoteAddress;
if (msg instanceof AddressedEnvelope) {
@ -319,6 +321,13 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
long memoryAddress = data.memoryAddress();
writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
} else if (data instanceof CompositeByteBuf) {
IovArray array = IovArray.get((CompositeByteBuf) data);
int cnt = array.count();
assert cnt != 0;
writtenBytes = Native.sendToAddresses(fd, array.memoryAddress(0),
cnt, remoteAddress.getAddress(), remoteAddress.getPort());
} else {
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(),
@ -344,13 +353,24 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.hasMemoryAddress()) {
return msg;
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
if (buf instanceof CompositeByteBuf) {
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
// in the CompositeByteBuf are backed by a memoryAddress.
CompositeByteBuf comp = (CompositeByteBuf) buf;
if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) {
// more then 1024 buffers for gathering writes so just do a memory copy.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
} else {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return newDirectBuffer(buf);
return buf;
}
if (msg instanceof AddressedEnvelope) {
@ -363,7 +383,14 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (content.hasMemoryAddress()) {
return e;
}
if (content instanceof CompositeByteBuf) {
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
// in the CompositeByteBuf are backed by a memoryAddress.
CompositeByteBuf comp = (CompositeByteBuf) content;
if (comp.isDirect() && comp.nioBufferCount() <= Native.IOV_MAX) {
return e;
}
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(

View File

@ -218,4 +218,12 @@ final class IovArray implements MessageProcessor {
buffer.forEachFlushedMessage(array);
return array;
}
static IovArray get(CompositeByteBuf buf) throws Exception {
IovArray array = ARRAY.get();
array.size = 0;
array.count = 0;
array.processMessage(buf);
return array;
}
}

View File

@ -118,6 +118,26 @@ final class Native {
private static native int sendToAddress(
int fd, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port) throws IOException;
public static int sendToAddresses(
int fd, long memoryAddress, int length, InetAddress addr, int port) throws IOException {
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
// to be called frequently
byte[] address;
int scopeId;
if (addr instanceof Inet6Address) {
address = addr.getAddress();
scopeId = ((Inet6Address) addr).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
return sendToAddresses(fd, memoryAddress, length, address, scopeId, port);
}
private static native int sendToAddresses(
int fd, long memoryAddress, int length, byte[] address, int scopeId, int port) throws IOException;
public static native EpollDatagramChannel.DatagramSocketAddress recvFrom(
int fd, ByteBuffer buf, int pos, int limit) throws IOException;