Move SegmentedDatagramPacket to transport-native-unix-common (#11121)

Motivation:

As we can supported SegmentedDatagramPacket in multiple native
transports (like in epoll and io_uring) we should just move it to
unix-common so we can share code.

Modification:

- Move SegmentedDatagrampPacket to transport-native-unixu
- Mark the SegmentedDatagramPacket in epoll as deprecated
- Update code to use updated package.

Result:

Possibility of code re-use
This commit is contained in:
Norman Maurer 2021-03-29 13:55:43 +02:00
parent 9403ceaeae
commit 72cdeae320
5 changed files with 142 additions and 36 deletions

View File

@ -67,6 +67,17 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
private final EpollDatagramChannelConfig config; private final EpollDatagramChannelConfig config;
private volatile boolean connected; private volatile boolean connected;
/**
* Returns {@code true} if {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported natively.
*
* @return {@code true} if supported, {@code false} otherwise.
*/
public static boolean isSegmentedDatagramPacketSupported() {
return Epoll.isAvailable() &&
// We only support it together with sendmmsg(...)
Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
}
/** /**
* Create a new instance which selects the {@link InternetProtocolFamily} to use depending * Create a new instance which selects the {@link InternetProtocolFamily} to use depending
* on the Operation Systems default which will be chosen. * on the Operation Systems default which will be chosen.
@ -299,7 +310,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+ // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 || if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
// We only handle UDP_SEGMENT in sendmmsg. // We only handle UDP_SEGMENT in sendmmsg.
in.current() instanceof SegmentedDatagramPacket) { in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
NativeDatagramPacketArray array = cleanDatagramPacketArray(); NativeDatagramPacketArray array = cleanDatagramPacketArray();
array.add(in, isConnected(), maxMessagesPerWrite); array.add(in, isConnected(), maxMessagesPerWrite);
int cnt = array.count(); int cnt = array.count();
@ -377,12 +388,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override @Override
protected Object filterOutboundMessage(Object msg) { protected Object filterOutboundMessage(Object msg) {
if (msg instanceof SegmentedDatagramPacket) { if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
if (!Native.IS_SUPPORTING_UDP_SEGMENT) { if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
} }
SegmentedDatagramPacket packet = (SegmentedDatagramPacket) msg; io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
ByteBuf content = packet.content(); ByteBuf content = packet.content();
return UnixChannelUtil.isBufferCopyNeededForWrite(content) ? return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
packet.replace(newDirectBuffer(packet, content)) : msg; packet.replace(newDirectBuffer(packet, content)) : msg;
@ -569,8 +580,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
private static void addDatagramPacketToOut(DatagramPacket packet, private static void addDatagramPacketToOut(DatagramPacket packet,
RecyclableArrayList out) { RecyclableArrayList out) {
if (packet instanceof SegmentedDatagramPacket) { if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
SegmentedDatagramPacket segmentedDatagramPacket = (SegmentedDatagramPacket) packet; io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
(io.netty.channel.unix.SegmentedDatagramPacket) packet;
ByteBuf content = segmentedDatagramPacket.content(); ByteBuf content = segmentedDatagramPacket.content();
InetSocketAddress recipient = segmentedDatagramPacket.recipient(); InetSocketAddress recipient = segmentedDatagramPacket.recipient();
InetSocketAddress sender = segmentedDatagramPacket.sender(); InetSocketAddress sender = segmentedDatagramPacket.sender();
@ -636,7 +648,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
byteBuf.writerIndex(bytesReceived); byteBuf.writerIndex(bytesReceived);
InetSocketAddress local = localAddress(); InetSocketAddress local = localAddress();
DatagramPacket packet = msg.newDatagramPacket(byteBuf, local); DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
if (!(packet instanceof SegmentedDatagramPacket)) { if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
processPacket(pipeline(), allocHandle, bytesReceived, packet); processPacket(pipeline(), allocHandle, bytesReceived, packet);
byteBuf = null; byteBuf = null;
} else { } else {
@ -686,7 +698,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (received == 1) { if (received == 1) {
// Single packet fast-path // Single packet fast-path
DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local); DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
if (!(packet instanceof SegmentedDatagramPacket)) { if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
processPacket(pipeline(), allocHandle, datagramSize, packet); processPacket(pipeline(), allocHandle, datagramSize, packet);
byteBuf = null; byteBuf = null;
return true; return true;

View File

@ -120,8 +120,8 @@ final class NativeDatagramPacketArray {
DatagramPacket packet = (DatagramPacket) msg; DatagramPacket packet = (DatagramPacket) msg;
ByteBuf buf = packet.content(); ByteBuf buf = packet.content();
int segmentSize = 0; int segmentSize = 0;
if (packet instanceof SegmentedDatagramPacket) { if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
int seg = ((SegmentedDatagramPacket) packet).segmentSize(); int seg = ((io.netty.channel.unix.SegmentedDatagramPacket) packet).segmentSize();
// We only need to tell the kernel that we want to use UDP_SEGMENT if there are multiple // We only need to tell the kernel that we want to use UDP_SEGMENT if there are multiple
// segments in the packet. // segments in the packet.
if (buf.readableBytes() > seg) { if (buf.readableBytes() > seg) {

View File

@ -16,18 +16,14 @@
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.ObjectUtil;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
/** /**
* Allows to use <a href="https://blog.cloudflare.com/accelerating-udp-packet-transmission-for-quic/">GSO</a> * @deprecated use {@link io.netty.channel.unix.SegmentedDatagramPacket}.
* if the underlying OS supports it. Before instance and use this class you should check {@link #isSupported()}.
*/ */
public final class SegmentedDatagramPacket extends DatagramPacket { @Deprecated
public final class SegmentedDatagramPacket extends io.netty.channel.unix.SegmentedDatagramPacket {
private final int segmentSize;
/** /**
* Create a new instance. * Create a new instance.
@ -37,9 +33,8 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
* @param recipient the recipient. * @param recipient the recipient.
*/ */
public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) { public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) {
super(data, recipient); super(data, segmentSize, recipient);
checkIsSupported(); checkIsSupported();
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
} }
/** /**
@ -51,9 +46,8 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
*/ */
public SegmentedDatagramPacket(ByteBuf data, int segmentSize, public SegmentedDatagramPacket(ByteBuf data, int segmentSize,
InetSocketAddress recipient, InetSocketAddress sender) { InetSocketAddress recipient, InetSocketAddress sender) {
super(data, recipient, sender); super(data, segmentSize, recipient, sender);
checkIsSupported(); checkIsSupported();
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
} }
/** /**
@ -65,33 +59,24 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT; Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
} }
/**
* Return the size of each segment (the last segment can be smaller).
*
* @return size of segments.
*/
public int segmentSize() {
return segmentSize;
}
@Override @Override
public SegmentedDatagramPacket copy() { public SegmentedDatagramPacket copy() {
return new SegmentedDatagramPacket(content().copy(), segmentSize, recipient(), sender()); return new SegmentedDatagramPacket(content().copy(), segmentSize(), recipient(), sender());
} }
@Override @Override
public SegmentedDatagramPacket duplicate() { public SegmentedDatagramPacket duplicate() {
return new SegmentedDatagramPacket(content().duplicate(), segmentSize, recipient(), sender()); return new SegmentedDatagramPacket(content().duplicate(), segmentSize(), recipient(), sender());
} }
@Override @Override
public SegmentedDatagramPacket retainedDuplicate() { public SegmentedDatagramPacket retainedDuplicate() {
return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize, recipient(), sender()); return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize(), recipient(), sender());
} }
@Override @Override
public SegmentedDatagramPacket replace(ByteBuf content) { public SegmentedDatagramPacket replace(ByteBuf content) {
return new SegmentedDatagramPacket(content, segmentSize, recipient(), sender()); return new SegmentedDatagramPacket(content, segmentSize(), recipient(), sender());
} }
@Override @Override

View File

@ -99,14 +99,14 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
// Only supported for the native epoll transport. // Only supported for the native epoll transport.
return; return;
} }
Assume.assumeTrue(SegmentedDatagramPacket.isSupported()); Assume.assumeTrue(EpollDatagramChannel.isSegmentedDatagramPacketSupported());
Channel sc = null; Channel sc = null;
Channel cc = null; Channel cc = null;
try { try {
cb.handler(new SimpleChannelInboundHandler<Object>() { cb.handler(new SimpleChannelInboundHandler<Object>() {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, Object msgs) throws Exception { public void messageReceived(ChannelHandlerContext ctx, Object msgs) {
// Nothing will be sent. // Nothing will be sent.
} }
}); });
@ -148,7 +148,7 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
} else { } else {
buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity); buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity);
} }
cc.writeAndFlush(new SegmentedDatagramPacket(buffer, segmentSize, addr)).sync(); cc.writeAndFlush(new io.netty.channel.unix.SegmentedDatagramPacket(buffer, segmentSize, addr)).sync();
if (!latch.await(10, TimeUnit.SECONDS)) { if (!latch.await(10, TimeUnit.SECONDS)) {
Throwable error = errorRef.get(); Throwable error = errorRef.get();

View File

@ -0,0 +1,109 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.unix;
import io.netty.buffer.ByteBuf;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.ObjectUtil;
import java.net.InetSocketAddress;
/**
* Allows to use <a href="https://blog.cloudflare.com/accelerating-udp-packet-transmission-for-quic/">GSO</a>
* if the underlying OS supports it. Before using this you should ensure your system support it.
*/
public class SegmentedDatagramPacket extends DatagramPacket {
private final int segmentSize;
/**
* Create a new instance.
*
* @param data the {@link ByteBuf} which must be continguous.
* @param segmentSize the segment size.
* @param recipient the recipient.
*/
public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) {
super(data, recipient);
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
}
/**
* Create a new instance.
*
* @param data the {@link ByteBuf} which must be continguous.
* @param segmentSize the segment size.
* @param recipient the recipient.
*/
public SegmentedDatagramPacket(ByteBuf data, int segmentSize,
InetSocketAddress recipient, InetSocketAddress sender) {
super(data, recipient, sender);
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
}
/**
* Return the size of each segment (the last segment can be smaller).
*
* @return size of segments.
*/
public int segmentSize() {
return segmentSize;
}
@Override
public SegmentedDatagramPacket copy() {
return new SegmentedDatagramPacket(content().copy(), segmentSize, recipient(), sender());
}
@Override
public SegmentedDatagramPacket duplicate() {
return new SegmentedDatagramPacket(content().duplicate(), segmentSize, recipient(), sender());
}
@Override
public SegmentedDatagramPacket retainedDuplicate() {
return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize, recipient(), sender());
}
@Override
public SegmentedDatagramPacket replace(ByteBuf content) {
return new SegmentedDatagramPacket(content, segmentSize, recipient(), sender());
}
@Override
public SegmentedDatagramPacket retain() {
super.retain();
return this;
}
@Override
public SegmentedDatagramPacket retain(int increment) {
super.retain(increment);
return this;
}
@Override
public SegmentedDatagramPacket touch() {
super.touch();
return this;
}
@Override
public SegmentedDatagramPacket touch(Object hint) {
super.touch(hint);
return this;
}
}