Also support CompositeByteBuf with SegmentedDatagramPacket (#11081)

Motivation:

c22c6b845d introduced support for
UDP_SEGMENT but did restrict it to continous buffers. This is not needed
as it is also fine to use CompositeByteBuf

Modifications:

- Allow to use CompositeByteBuf as well
- Add unit test

Result:

More flexible usage of segmented datagrams possible
This commit is contained in:
Norman Maurer 2021-03-12 15:37:46 +01:00
parent 00eb5618df
commit 5e11c007f7
2 changed files with 32 additions and 15 deletions

View File

@ -37,7 +37,7 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
* @param recipient the recipient. * @param recipient the recipient.
*/ */
public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) { public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) {
super(checkByteBuf(data), recipient); super(data, recipient);
checkIsSupported(); checkIsSupported();
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize"); this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
} }
@ -51,7 +51,7 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
*/ */
public SegmentedDatagramPacket(ByteBuf data, int segmentSize, public SegmentedDatagramPacket(ByteBuf data, int segmentSize,
InetSocketAddress recipient, InetSocketAddress sender) { InetSocketAddress recipient, InetSocketAddress sender) {
super(checkByteBuf(data), recipient, sender); super(data, recipient, sender);
checkIsSupported(); checkIsSupported();
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize"); this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
} }
@ -118,13 +118,6 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
return this; return this;
} }
private static ByteBuf checkByteBuf(ByteBuf buffer) {
if (!buffer.isContiguous()) {
throw new IllegalArgumentException("Buffer needs to be continguous");
}
return buffer;
}
private static void checkIsSupported() { private static void checkIsSupported() {
if (!isSupported()) { if (!isSupported()) {
throw new IllegalStateException(); throw new IllegalStateException();

View File

@ -17,6 +17,7 @@ package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -55,7 +56,20 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
run(); run();
} }
public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable {
testSendSegmentedDatagramPacket(sb, cb, false);
}
@Test
public void testSendSegmentedDatagramPacketComposite() throws Throwable {
run();
}
public void testSendSegmentedDatagramPacketComposite(Bootstrap sb, Bootstrap cb) throws Throwable {
testSendSegmentedDatagramPacket(sb, cb, true);
}
private void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb, boolean composite)
throws Throwable { throws Throwable {
if (!(cb.group() instanceof EpollEventLoopGroup)) { if (!(cb.group() instanceof EpollEventLoopGroup)) {
// Only supported for the native epoll transport. // Only supported for the native epoll transport.
@ -66,19 +80,19 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
Channel cc = null; Channel cc = null;
try { try {
cb.handler(new SimpleChannelInboundHandler() { cb.handler(new SimpleChannelInboundHandler<Object>() {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, Object msgs) throws Exception { public void messageReceived(ChannelHandlerContext ctx, Object msgs) throws Exception {
// Nothing will be sent. // Nothing will be sent.
} }
}); });
final SocketAddress sender;
cc = cb.bind(newSocketAddress()).sync().channel(); cc = cb.bind(newSocketAddress()).sync().channel();
final int numBuffers = 16;
final int segmentSize = 512; final int segmentSize = 512;
int bufferCapacity = 16 * segmentSize; int bufferCapacity = numBuffers * segmentSize;
final CountDownLatch latch = new CountDownLatch(bufferCapacity / segmentSize); final CountDownLatch latch = new CountDownLatch(numBuffers);
AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>(); AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
sc = sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() { sc = sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override @Override
@ -90,7 +104,17 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
}).bind(newSocketAddress()).sync().channel(); }).bind(newSocketAddress()).sync().channel();
InetSocketAddress addr = sendToAddress((InetSocketAddress) sc.localAddress()); InetSocketAddress addr = sendToAddress((InetSocketAddress) sc.localAddress());
ByteBuf buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity); final ByteBuf buffer;
if (composite) {
CompositeByteBuf compositeBuffer = Unpooled.compositeBuffer();
for (int i = 0; i < numBuffers; i++) {
compositeBuffer.addComponent(true,
Unpooled.directBuffer(segmentSize).writeZero(segmentSize));
}
buffer = compositeBuffer;
} else {
buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity);
}
cc.writeAndFlush(new SegmentedDatagramPacket(buffer, segmentSize, addr)).sync(); cc.writeAndFlush(new SegmentedDatagramPacket(buffer, segmentSize, addr)).sync();
if (!latch.await(10, TimeUnit.SECONDS)) { if (!latch.await(10, TimeUnit.SECONDS)) {