Also support CompositeByteBuf with SegmentedDatagramPacket (#11081)

Motivation:

c22c6b845d introduced support for
UDP_SEGMENT but did restrict it to continous buffers. This is not needed
as it is also fine to use CompositeByteBuf

Modifications:

- Allow to use CompositeByteBuf as well
- Add unit test

Result:

More flexible usage of segmented datagrams possible
This commit is contained in:
Norman Maurer 2021-03-12 15:37:46 +01:00 committed by GitHub
parent 3f23f59b87
commit 773ecb81fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 15 deletions

View File

@ -37,7 +37,7 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
* @param recipient the recipient.
*/
public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) {
super(checkByteBuf(data), recipient);
super(data, recipient);
checkIsSupported();
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
}
@ -51,7 +51,7 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
*/
public SegmentedDatagramPacket(ByteBuf data, int segmentSize,
InetSocketAddress recipient, InetSocketAddress sender) {
super(checkByteBuf(data), recipient, sender);
super(data, recipient, sender);
checkIsSupported();
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
}
@ -118,13 +118,6 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
return this;
}
private static ByteBuf checkByteBuf(ByteBuf buffer) {
if (!buffer.isContiguous()) {
throw new IllegalArgumentException("Buffer needs to be continguous");
}
return buffer;
}
private static void checkIsSupported() {
if (!isSupported()) {
throw new IllegalStateException();

View File

@ -17,6 +17,7 @@ package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@ -55,7 +56,20 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
run();
}
public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb)
public void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb) throws Throwable {
testSendSegmentedDatagramPacket(sb, cb, false);
}
@Test
public void testSendSegmentedDatagramPacketComposite() throws Throwable {
run();
}
public void testSendSegmentedDatagramPacketComposite(Bootstrap sb, Bootstrap cb) throws Throwable {
testSendSegmentedDatagramPacket(sb, cb, true);
}
private void testSendSegmentedDatagramPacket(Bootstrap sb, Bootstrap cb, boolean composite)
throws Throwable {
if (!(cb.group() instanceof EpollEventLoopGroup)) {
// Only supported for the native epoll transport.
@ -66,19 +80,19 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
Channel cc = null;
try {
cb.handler(new SimpleChannelInboundHandler() {
cb.handler(new SimpleChannelInboundHandler<Object>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception {
// Nothing will be sent.
}
});
final SocketAddress sender;
cc = cb.bind(newSocketAddress()).sync().channel();
final int numBuffers = 16;
final int segmentSize = 512;
int bufferCapacity = 16 * segmentSize;
final CountDownLatch latch = new CountDownLatch(bufferCapacity / segmentSize);
int bufferCapacity = numBuffers * segmentSize;
final CountDownLatch latch = new CountDownLatch(numBuffers);
AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
sc = sb.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
@ -90,7 +104,17 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
}).bind(newSocketAddress()).sync().channel();
InetSocketAddress addr = sendToAddress((InetSocketAddress) sc.localAddress());
ByteBuf buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity);
final ByteBuf buffer;
if (composite) {
CompositeByteBuf compositeBuffer = Unpooled.compositeBuffer();
for (int i = 0; i < numBuffers; i++) {
compositeBuffer.addComponent(true,
Unpooled.directBuffer(segmentSize).writeZero(segmentSize));
}
buffer = compositeBuffer;
} else {
buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity);
}
cc.writeAndFlush(new SegmentedDatagramPacket(buffer, segmentSize, addr)).sync();
if (!latch.await(10, TimeUnit.SECONDS)) {