Fix handle of ByteBuf with multi nioBuffer in EpollDatagramChannel and KQueueDatagramChannel
Motivation: 1. special handling of ByteBuf with multi nioBuffer rather than type of CompositeByteBuf (eg. DuplicatedByteBuf with CompositeByteBuf) 2. EpollDatagramUnicastTest and KQueueDatagramUnicastTest passed because CompositeByteBuf is converted to DuplicatedByteBuf before write to channel 3. uninitalized struct msghdr will raise error Modifications: 1. isBufferCopyNeededForWrite(like isSingleDirectBuffer in NioDatgramChannel) checks wether a new direct buffer is needed 2. special handling of ByteBuf with multi nioBuffer in EpollDatagramChannel, AbstractEpollStreamChannel, KQueueDatagramChannel, AbstractKQueueStreamChannel and IovArray 3. initalize struct msghdr Result: handle of ByteBuf with multi nioBuffer in EpollDatagramChannel and KQueueDatagramChannel are ok
This commit is contained in:
parent
b419bd152a
commit
3c4dfed08a
@ -36,14 +36,18 @@ import static org.junit.Assert.*;
|
||||
public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
|
||||
private static final byte[] BYTES = {0, 1, 2, 3};
|
||||
private enum WrapType {
|
||||
NONE, DUP, SLICE,
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleSendDirectByteBuf() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 1);
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 4);
|
||||
testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 1);
|
||||
testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -52,8 +56,8 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
}
|
||||
|
||||
public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSimpleSend0(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 1);
|
||||
testSimpleSend0(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 4);
|
||||
testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 1);
|
||||
testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -65,12 +69,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||
buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
|
||||
buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
|
||||
testSimpleSend0(sb, cb, buf, true, BYTES, 1);
|
||||
testSimpleSend(sb, cb, buf, true, BYTES, 1);
|
||||
|
||||
CompositeByteBuf buf2 = Unpooled.compositeBuffer();
|
||||
buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
|
||||
buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
|
||||
testSimpleSend0(sb, cb, buf2, true, BYTES, 4);
|
||||
testSimpleSend(sb, cb, buf2, true, BYTES, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -82,12 +86,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||
buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
|
||||
buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
|
||||
testSimpleSend0(sb, cb, buf, true, BYTES, 1);
|
||||
testSimpleSend(sb, cb, buf, true, BYTES, 1);
|
||||
|
||||
CompositeByteBuf buf2 = Unpooled.compositeBuffer();
|
||||
buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
|
||||
buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
|
||||
testSimpleSend0(sb, cb, buf2, true, BYTES, 4);
|
||||
testSimpleSend(sb, cb, buf2, true, BYTES, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -99,12 +103,12 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
CompositeByteBuf buf = Unpooled.compositeBuffer();
|
||||
buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
|
||||
buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
|
||||
testSimpleSend0(sb, cb, buf, true, BYTES, 1);
|
||||
testSimpleSend(sb, cb, buf, true, BYTES, 1);
|
||||
|
||||
CompositeByteBuf buf2 = Unpooled.compositeBuffer();
|
||||
buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
|
||||
buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
|
||||
testSimpleSend0(sb, cb, buf2, true, BYTES, 4);
|
||||
testSimpleSend(sb, cb, buf2, true, BYTES, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -113,13 +117,21 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
}
|
||||
|
||||
public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 1);
|
||||
testSimpleSend0(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 4);
|
||||
testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 1);
|
||||
testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 4);
|
||||
}
|
||||
|
||||
private void testSimpleSend(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
|
||||
final byte[] bytes, int count) throws Throwable {
|
||||
for (WrapType type: WrapType.values()) {
|
||||
testSimpleSend0(sb, cb, buf.retain(), bindClient, bytes, count, type);
|
||||
}
|
||||
assertTrue(buf.release());
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
|
||||
final byte[] bytes, int count)
|
||||
final byte[] bytes, int count, WrapType wrapType)
|
||||
throws Throwable {
|
||||
final CountDownLatch latch = new CountDownLatch(count);
|
||||
|
||||
@ -177,7 +189,15 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
|
||||
}
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
cc.write(new DatagramPacket(buf.retain().duplicate(), addr));
|
||||
if (wrapType == WrapType.DUP) {
|
||||
cc.write(new DatagramPacket(buf.retain().duplicate(), addr));
|
||||
} else if (wrapType == WrapType.SLICE) {
|
||||
cc.write(new DatagramPacket(buf.retain().slice(), addr));
|
||||
} else if (wrapType == WrapType.NONE) {
|
||||
cc.write(new DatagramPacket(buf.retain(), addr));
|
||||
} else {
|
||||
throw new Exception("unknown wrap type: " + wrapType);
|
||||
}
|
||||
}
|
||||
// release as we used buf.retain() before
|
||||
buf.release();
|
||||
|
@ -17,7 +17,6 @@ package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
@ -35,6 +34,7 @@ import io.netty.channel.socket.DuplexChannel;
|
||||
import io.netty.channel.unix.FileDescriptor;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.channel.unix.SocketWritableByteChannel;
|
||||
import io.netty.channel.unix.UnixChannelUtil;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import io.netty.util.internal.ThrowableUtil;
|
||||
@ -53,7 +53,6 @@ import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static io.netty.channel.unix.FileDescriptor.pipe;
|
||||
import static io.netty.channel.unix.Limits.IOV_MAX;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
|
||||
public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
|
||||
@ -539,24 +538,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
|
||||
protected Object filterOutboundMessage(Object msg) {
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
|
||||
if (buf instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) buf;
|
||||
if (!comp.isDirect() || comp.nioBufferCount() > IOV_MAX) {
|
||||
// more then 1024 buffers for gathering writes so just do a memory copy.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
} else {
|
||||
// We can only handle buffers with memory address so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
}
|
||||
return buf;
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
|
||||
}
|
||||
|
||||
if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
|
||||
|
@ -17,7 +17,6 @@ package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.AddressedEnvelope;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
@ -30,6 +29,7 @@ import io.netty.channel.socket.DatagramChannelConfig;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.unix.DatagramSocketAddress;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.channel.unix.UnixChannelUtil;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
@ -45,7 +45,6 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
|
||||
import static io.netty.channel.unix.Limits.IOV_MAX;
|
||||
|
||||
/**
|
||||
* {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
|
||||
@ -373,7 +372,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
long memoryAddress = data.memoryAddress();
|
||||
writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
|
||||
remoteAddress.getAddress(), remoteAddress.getPort());
|
||||
} else if (data instanceof CompositeByteBuf) {
|
||||
} else if (data.nioBufferCount() > 1) {
|
||||
IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
|
||||
array.add(data);
|
||||
int cnt = array.count();
|
||||
@ -395,43 +394,13 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
if (msg instanceof DatagramPacket) {
|
||||
DatagramPacket packet = (DatagramPacket) msg;
|
||||
ByteBuf content = packet.content();
|
||||
if (content.hasMemoryAddress()) {
|
||||
return msg;
|
||||
}
|
||||
|
||||
if (content.isDirect() && content instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) content;
|
||||
if (comp.isDirect() && comp.nioBufferCount() <= IOV_MAX) {
|
||||
return msg;
|
||||
}
|
||||
}
|
||||
// We can only handle direct buffers so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
return new DatagramPacket(newDirectBuffer(packet, content), packet.recipient());
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
|
||||
new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
|
||||
}
|
||||
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
|
||||
if (buf instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) buf;
|
||||
if (!comp.isDirect() || comp.nioBufferCount() > IOV_MAX) {
|
||||
// more then 1024 buffers for gathering writes so just do a memory copy.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
} else {
|
||||
// We can only handle buffers with memory address so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
}
|
||||
return buf;
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
|
||||
}
|
||||
|
||||
if (msg instanceof AddressedEnvelope) {
|
||||
@ -441,21 +410,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
(e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
|
||||
|
||||
ByteBuf content = (ByteBuf) e.content();
|
||||
if (content.hasMemoryAddress()) {
|
||||
return e;
|
||||
}
|
||||
if (content instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) content;
|
||||
if (comp.isDirect() && comp.nioBufferCount() <= IOV_MAX) {
|
||||
return e;
|
||||
}
|
||||
}
|
||||
// We can only handle direct buffers so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
|
||||
newDirectBuffer(e, content), (InetSocketAddress) e.recipient());
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
|
||||
new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
|
||||
newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,6 @@ package io.netty.channel.kqueue;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
@ -33,6 +32,7 @@ import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.socket.DuplexChannel;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.channel.unix.SocketWritableByteChannel;
|
||||
import io.netty.channel.unix.UnixChannelUtil;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import io.netty.util.internal.ThrowableUtil;
|
||||
@ -48,8 +48,6 @@ import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static io.netty.channel.unix.Limits.IOV_MAX;
|
||||
|
||||
@UnstableApi
|
||||
public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
|
||||
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
|
||||
@ -369,24 +367,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
|
||||
protected Object filterOutboundMessage(Object msg) {
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
|
||||
if (buf instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) buf;
|
||||
if (!comp.isDirect() || comp.nioBufferCount() > IOV_MAX) {
|
||||
// more then 1024 buffers for gathering writes so just do a memory copy.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
} else {
|
||||
// We can only handle buffers with memory address so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
}
|
||||
return buf;
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
|
||||
}
|
||||
|
||||
if (msg instanceof FileRegion) {
|
||||
|
@ -17,7 +17,6 @@ package io.netty.channel.kqueue;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.AddressedEnvelope;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
@ -30,6 +29,7 @@ import io.netty.channel.socket.DatagramChannelConfig;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.unix.DatagramSocketAddress;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.channel.unix.UnixChannelUtil;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
@ -46,7 +46,6 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
|
||||
import static io.netty.channel.unix.Limits.IOV_MAX;
|
||||
|
||||
@UnstableApi
|
||||
public final class KQueueDatagramChannel extends AbstractKQueueChannel implements DatagramChannel {
|
||||
@ -345,7 +344,7 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
|
||||
long memoryAddress = data.memoryAddress();
|
||||
writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
|
||||
remoteAddress.getAddress(), remoteAddress.getPort());
|
||||
} else if (data instanceof CompositeByteBuf) {
|
||||
} else if (data.nioBufferCount() > 1) {
|
||||
IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
|
||||
array.add(data);
|
||||
int cnt = array.count();
|
||||
@ -367,43 +366,13 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
|
||||
if (msg instanceof DatagramPacket) {
|
||||
DatagramPacket packet = (DatagramPacket) msg;
|
||||
ByteBuf content = packet.content();
|
||||
if (content.hasMemoryAddress()) {
|
||||
return msg;
|
||||
}
|
||||
|
||||
if (content.isDirect() && content instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) content;
|
||||
if (comp.isDirect() && comp.nioBufferCount() <= IOV_MAX) {
|
||||
return msg;
|
||||
}
|
||||
}
|
||||
// We can only handle direct buffers so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
return new DatagramPacket(newDirectBuffer(packet, content), packet.recipient());
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
|
||||
new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
|
||||
}
|
||||
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
|
||||
if (buf instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) buf;
|
||||
if (!comp.isDirect() || comp.nioBufferCount() > IOV_MAX) {
|
||||
// more then 1024 buffers for gathering writes so just do a memory copy.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
} else {
|
||||
// We can only handle buffers with memory address so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
}
|
||||
return buf;
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
|
||||
}
|
||||
|
||||
if (msg instanceof AddressedEnvelope) {
|
||||
@ -413,21 +382,9 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
|
||||
(e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
|
||||
|
||||
ByteBuf content = (ByteBuf) e.content();
|
||||
if (content.hasMemoryAddress()) {
|
||||
return e;
|
||||
}
|
||||
if (content instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) content;
|
||||
if (comp.isDirect() && comp.nioBufferCount() <= IOV_MAX) {
|
||||
return e;
|
||||
}
|
||||
}
|
||||
// We can only handle direct buffers so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
|
||||
newDirectBuffer(e, content), (InetSocketAddress) e.recipient());
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
|
||||
new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
|
||||
newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright 2017 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.channel.unix.tests;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import static io.netty.channel.unix.Limits.IOV_MAX;
|
||||
import static io.netty.channel.unix.UnixChannelUtil.isBufferCopyNeededForWrite;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class UnixChannelUtilTest {
|
||||
@Test
|
||||
public void testPooledAllocatorIsBufferCopyNeededForWrite() {
|
||||
testIsBufferCopyNeededForWrite(PooledByteBufAllocator.DEFAULT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnPooledAllocatorIsBufferCopyNeededForWrite() {
|
||||
testIsBufferCopyNeededForWrite(UnpooledByteBufAllocator.DEFAULT);
|
||||
}
|
||||
|
||||
private static void testIsBufferCopyNeededForWrite(ByteBufAllocator alloc) {
|
||||
ByteBuf byteBuf = alloc.directBuffer();
|
||||
assertFalse(isBufferCopyNeededForWrite(byteBuf));
|
||||
assertTrue(byteBuf.release());
|
||||
|
||||
byteBuf = alloc.heapBuffer();
|
||||
assertTrue(isBufferCopyNeededForWrite(byteBuf));
|
||||
assertTrue(byteBuf.release());
|
||||
|
||||
assertCompositeByteBufIsBufferCopyNeededForWrite(alloc, 2, 0, false);
|
||||
assertCompositeByteBufIsBufferCopyNeededForWrite(alloc, IOV_MAX + 1, 0, true);
|
||||
assertCompositeByteBufIsBufferCopyNeededForWrite(alloc, 0, 2, true);
|
||||
assertCompositeByteBufIsBufferCopyNeededForWrite(alloc, 1, 1, true);
|
||||
}
|
||||
|
||||
private static void assertCompositeByteBufIsBufferCopyNeededForWrite(ByteBufAllocator alloc, int numDirect,
|
||||
int numHeap, boolean expected) {
|
||||
CompositeByteBuf comp = alloc.compositeBuffer(numDirect + numHeap);
|
||||
List<ByteBuf> byteBufs = new LinkedList<ByteBuf>();
|
||||
|
||||
while (numDirect > 0) {
|
||||
byteBufs.add(alloc.directBuffer(1));
|
||||
numDirect--;
|
||||
}
|
||||
while (numHeap > 0) {
|
||||
byteBufs.add(alloc.heapBuffer(1));
|
||||
numHeap--;
|
||||
}
|
||||
|
||||
Collections.shuffle(byteBufs);
|
||||
for (ByteBuf byteBuf : byteBufs) {
|
||||
comp.addComponent(byteBuf);
|
||||
}
|
||||
|
||||
assertEquals(byteBufs.toString(), expected, isBufferCopyNeededForWrite(comp));
|
||||
assertTrue(comp.release());
|
||||
}
|
||||
}
|
@ -496,7 +496,7 @@ static jint netty_unix_socket_sendToAddresses(JNIEnv* env, jclass clazz, jint fd
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct msghdr m;
|
||||
struct msghdr m = { 0 };
|
||||
m.msg_name = (void*) &addr;
|
||||
m.msg_namelen = addrSize;
|
||||
m.msg_iov = (struct iovec*) (intptr_t) memoryAddress;
|
||||
|
@ -16,9 +16,9 @@
|
||||
package io.netty.channel.unix;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import static io.netty.channel.unix.Limits.IOV_MAX;
|
||||
@ -76,22 +76,41 @@ public final class IovArray implements MessageProcessor {
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
public boolean add(ByteBuf buf) {
|
||||
if (count == IOV_MAX) {
|
||||
int nioBufferCount = buf.nioBufferCount();
|
||||
if (count + nioBufferCount > IOV_MAX) {
|
||||
// No more room!
|
||||
return false;
|
||||
}
|
||||
|
||||
final int len = buf.readableBytes();
|
||||
if (len == 0) {
|
||||
// No need to add an empty buffer.
|
||||
// We return true here because we want ChannelOutboundBuffer.forEachFlushedMessage() to continue
|
||||
// fetching the next buffers.
|
||||
if (nioBufferCount == 1) {
|
||||
final int len = buf.readableBytes();
|
||||
if (len == 0) {
|
||||
// No need to add an empty buffer.
|
||||
// We return true here because we want ChannelOutboundBuffer.forEachFlushedMessage() to continue
|
||||
// fetching the next buffers.
|
||||
return true;
|
||||
}
|
||||
|
||||
final long addr = buf.memoryAddress();
|
||||
final int offset = buf.readerIndex();
|
||||
return add(addr, offset, len);
|
||||
} else {
|
||||
ByteBuffer[] buffers = buf.nioBuffers();
|
||||
for (ByteBuffer nioBuffer : buffers) {
|
||||
int len = nioBuffer.remaining();
|
||||
if (len == 0) {
|
||||
// No need to add an empty buffer so just continue
|
||||
continue;
|
||||
}
|
||||
int offset = nioBuffer.position();
|
||||
long addr = PlatformDependent.directBufferAddress(nioBuffer);
|
||||
|
||||
if (!add(addr, offset, len)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
final long addr = buf.memoryAddress();
|
||||
final int offset = buf.readerIndex();
|
||||
return add(addr, offset, len);
|
||||
}
|
||||
|
||||
private boolean add(long addr, int offset, int len) {
|
||||
@ -126,32 +145,6 @@ public final class IovArray implements MessageProcessor {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to add the given {@link CompositeByteBuf}. Returns {@code true} on success,
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
public boolean add(CompositeByteBuf buf) {
|
||||
ByteBuffer[] buffers = buf.nioBuffers();
|
||||
if (count + buffers.length >= IOV_MAX) {
|
||||
// No more room!
|
||||
return false;
|
||||
}
|
||||
for (ByteBuffer nioBuffer : buffers) {
|
||||
int offset = nioBuffer.position();
|
||||
int len = nioBuffer.limit() - nioBuffer.position();
|
||||
if (len == 0) {
|
||||
// No need to add an empty buffer so just continue
|
||||
continue;
|
||||
}
|
||||
long addr = PlatformDependent.directBufferAddress(nioBuffer);
|
||||
|
||||
if (!add(addr, offset, len)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the written iov entries. This will return the length of the iov entry on the given index if it is
|
||||
* smaller then the given {@code written} value. Otherwise it returns {@code -1}.
|
||||
@ -213,11 +206,7 @@ public final class IovArray implements MessageProcessor {
|
||||
@Override
|
||||
public boolean processMessage(Object msg) throws Exception {
|
||||
if (msg instanceof ByteBuf) {
|
||||
if (msg instanceof CompositeByteBuf) {
|
||||
return add((CompositeByteBuf) msg);
|
||||
} else {
|
||||
return add((ByteBuf) msg);
|
||||
}
|
||||
return add((ByteBuf) msg);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright 2017 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.unix;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
import static io.netty.channel.unix.Limits.IOV_MAX;
|
||||
|
||||
public final class UnixChannelUtil {
|
||||
|
||||
private UnixChannelUtil() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the specified buffer has memory address or is composed of n(n <= IOV_MAX) NIO direct buffers.
|
||||
* (We check this because otherwise we need to make it a new direct buffer.)
|
||||
*/
|
||||
public static boolean isBufferCopyNeededForWrite(ByteBuf byteBuf) {
|
||||
return !(byteBuf.hasMemoryAddress() || byteBuf.isDirect() && byteBuf.nioBufferCount() <= IOV_MAX);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user