Move SegmentedDatagramPacket to transport-native-unix-common (#11121)
Motivation: As we can supported SegmentedDatagramPacket in multiple native transports (like in epoll and io_uring) we should just move it to unix-common so we can share code. Modification: - Move SegmentedDatagrampPacket to transport-native-unixu - Mark the SegmentedDatagramPacket in epoll as deprecated - Update code to use updated package. Result: Possibility of code re-use
This commit is contained in:
parent
aaef54951f
commit
6c741932aa
@ -65,6 +65,17 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
private final EpollDatagramChannelConfig config;
|
private final EpollDatagramChannelConfig config;
|
||||||
private volatile boolean connected;
|
private volatile boolean connected;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported natively.
|
||||||
|
*
|
||||||
|
* @return {@code true} if supported, {@code false} otherwise.
|
||||||
|
*/
|
||||||
|
public static boolean isSegmentedDatagramPacketSupported() {
|
||||||
|
return Epoll.isAvailable() &&
|
||||||
|
// We only support it together with sendmmsg(...)
|
||||||
|
Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new instance which selects the {@link InternetProtocolFamily} to use depending
|
* Create a new instance which selects the {@link InternetProtocolFamily} to use depending
|
||||||
* on the Operation Systems default which will be chosen.
|
* on the Operation Systems default which will be chosen.
|
||||||
@ -298,7 +309,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
|
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
|
||||||
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
|
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
|
||||||
// We only handle UDP_SEGMENT in sendmmsg.
|
// We only handle UDP_SEGMENT in sendmmsg.
|
||||||
in.current() instanceof SegmentedDatagramPacket) {
|
in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
|
||||||
NativeDatagramPacketArray array = cleanDatagramPacketArray();
|
NativeDatagramPacketArray array = cleanDatagramPacketArray();
|
||||||
array.add(in, isConnected(), maxMessagesPerWrite);
|
array.add(in, isConnected(), maxMessagesPerWrite);
|
||||||
int cnt = array.count();
|
int cnt = array.count();
|
||||||
@ -376,12 +387,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object filterOutboundMessage(Object msg) {
|
protected Object filterOutboundMessage(Object msg) {
|
||||||
if (msg instanceof SegmentedDatagramPacket) {
|
if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
|
||||||
if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
|
if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
|
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
|
||||||
}
|
}
|
||||||
SegmentedDatagramPacket packet = (SegmentedDatagramPacket) msg;
|
io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
|
||||||
ByteBuf content = packet.content();
|
ByteBuf content = packet.content();
|
||||||
return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
|
return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
|
||||||
packet.replace(newDirectBuffer(packet, content)) : msg;
|
packet.replace(newDirectBuffer(packet, content)) : msg;
|
||||||
@ -568,8 +579,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
|
|
||||||
private static void addDatagramPacketToOut(DatagramPacket packet,
|
private static void addDatagramPacketToOut(DatagramPacket packet,
|
||||||
RecyclableArrayList out) {
|
RecyclableArrayList out) {
|
||||||
if (packet instanceof SegmentedDatagramPacket) {
|
if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
|
||||||
SegmentedDatagramPacket segmentedDatagramPacket = (SegmentedDatagramPacket) packet;
|
io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
|
||||||
|
(io.netty.channel.unix.SegmentedDatagramPacket) packet;
|
||||||
ByteBuf content = segmentedDatagramPacket.content();
|
ByteBuf content = segmentedDatagramPacket.content();
|
||||||
InetSocketAddress recipient = segmentedDatagramPacket.recipient();
|
InetSocketAddress recipient = segmentedDatagramPacket.recipient();
|
||||||
InetSocketAddress sender = segmentedDatagramPacket.sender();
|
InetSocketAddress sender = segmentedDatagramPacket.sender();
|
||||||
@ -635,7 +647,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
byteBuf.writerIndex(bytesReceived);
|
byteBuf.writerIndex(bytesReceived);
|
||||||
InetSocketAddress local = localAddress();
|
InetSocketAddress local = localAddress();
|
||||||
DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
|
DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
|
||||||
if (!(packet instanceof SegmentedDatagramPacket)) {
|
if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
|
||||||
processPacket(pipeline(), allocHandle, bytesReceived, packet);
|
processPacket(pipeline(), allocHandle, bytesReceived, packet);
|
||||||
byteBuf = null;
|
byteBuf = null;
|
||||||
} else {
|
} else {
|
||||||
@ -685,7 +697,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
if (received == 1) {
|
if (received == 1) {
|
||||||
// Single packet fast-path
|
// Single packet fast-path
|
||||||
DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
|
DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
|
||||||
if (!(packet instanceof SegmentedDatagramPacket)) {
|
if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
|
||||||
processPacket(pipeline(), allocHandle, datagramSize, packet);
|
processPacket(pipeline(), allocHandle, datagramSize, packet);
|
||||||
byteBuf = null;
|
byteBuf = null;
|
||||||
return true;
|
return true;
|
||||||
|
@ -119,8 +119,8 @@ final class NativeDatagramPacketArray {
|
|||||||
DatagramPacket packet = (DatagramPacket) msg;
|
DatagramPacket packet = (DatagramPacket) msg;
|
||||||
ByteBuf buf = packet.content();
|
ByteBuf buf = packet.content();
|
||||||
int segmentSize = 0;
|
int segmentSize = 0;
|
||||||
if (packet instanceof SegmentedDatagramPacket) {
|
if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
|
||||||
int seg = ((SegmentedDatagramPacket) packet).segmentSize();
|
int seg = ((io.netty.channel.unix.SegmentedDatagramPacket) packet).segmentSize();
|
||||||
// We only need to tell the kernel that we want to use UDP_SEGMENT if there are multiple
|
// We only need to tell the kernel that we want to use UDP_SEGMENT if there are multiple
|
||||||
// segments in the packet.
|
// segments in the packet.
|
||||||
if (buf.readableBytes() > seg) {
|
if (buf.readableBytes() > seg) {
|
||||||
|
@ -16,18 +16,14 @@
|
|||||||
package io.netty.channel.epoll;
|
package io.netty.channel.epoll;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.socket.DatagramPacket;
|
|
||||||
import io.netty.util.internal.ObjectUtil;
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allows to use <a href="https://blog.cloudflare.com/accelerating-udp-packet-transmission-for-quic/">GSO</a>
|
* @deprecated use {@link io.netty.channel.unix.SegmentedDatagramPacket}.
|
||||||
* if the underlying OS supports it. Before instance and use this class you should check {@link #isSupported()}.
|
|
||||||
*/
|
*/
|
||||||
public final class SegmentedDatagramPacket extends DatagramPacket {
|
@Deprecated
|
||||||
|
public final class SegmentedDatagramPacket extends io.netty.channel.unix.SegmentedDatagramPacket {
|
||||||
private final int segmentSize;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new instance.
|
* Create a new instance.
|
||||||
@ -37,9 +33,8 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
|
|||||||
* @param recipient the recipient.
|
* @param recipient the recipient.
|
||||||
*/
|
*/
|
||||||
public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) {
|
public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) {
|
||||||
super(data, recipient);
|
super(data, segmentSize, recipient);
|
||||||
checkIsSupported();
|
checkIsSupported();
|
||||||
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -51,9 +46,8 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
|
|||||||
*/
|
*/
|
||||||
public SegmentedDatagramPacket(ByteBuf data, int segmentSize,
|
public SegmentedDatagramPacket(ByteBuf data, int segmentSize,
|
||||||
InetSocketAddress recipient, InetSocketAddress sender) {
|
InetSocketAddress recipient, InetSocketAddress sender) {
|
||||||
super(data, recipient, sender);
|
super(data, segmentSize, recipient, sender);
|
||||||
checkIsSupported();
|
checkIsSupported();
|
||||||
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -65,33 +59,24 @@ public final class SegmentedDatagramPacket extends DatagramPacket {
|
|||||||
Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
|
Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the size of each segment (the last segment can be smaller).
|
|
||||||
*
|
|
||||||
* @return size of segments.
|
|
||||||
*/
|
|
||||||
public int segmentSize() {
|
|
||||||
return segmentSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SegmentedDatagramPacket copy() {
|
public SegmentedDatagramPacket copy() {
|
||||||
return new SegmentedDatagramPacket(content().copy(), segmentSize, recipient(), sender());
|
return new SegmentedDatagramPacket(content().copy(), segmentSize(), recipient(), sender());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SegmentedDatagramPacket duplicate() {
|
public SegmentedDatagramPacket duplicate() {
|
||||||
return new SegmentedDatagramPacket(content().duplicate(), segmentSize, recipient(), sender());
|
return new SegmentedDatagramPacket(content().duplicate(), segmentSize(), recipient(), sender());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SegmentedDatagramPacket retainedDuplicate() {
|
public SegmentedDatagramPacket retainedDuplicate() {
|
||||||
return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize, recipient(), sender());
|
return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize(), recipient(), sender());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SegmentedDatagramPacket replace(ByteBuf content) {
|
public SegmentedDatagramPacket replace(ByteBuf content) {
|
||||||
return new SegmentedDatagramPacket(content, segmentSize, recipient(), sender());
|
return new SegmentedDatagramPacket(content, segmentSize(), recipient(), sender());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -99,14 +99,14 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
|
|||||||
// Only supported for the native epoll transport.
|
// Only supported for the native epoll transport.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Assume.assumeTrue(SegmentedDatagramPacket.isSupported());
|
Assume.assumeTrue(EpollDatagramChannel.isSegmentedDatagramPacketSupported());
|
||||||
Channel sc = null;
|
Channel sc = null;
|
||||||
Channel cc = null;
|
Channel cc = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
cb.handler(new SimpleChannelInboundHandler<Object>() {
|
cb.handler(new SimpleChannelInboundHandler<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception {
|
public void channelRead0(ChannelHandlerContext ctx, Object msgs) {
|
||||||
// Nothing will be sent.
|
// Nothing will be sent.
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -148,7 +148,7 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest {
|
|||||||
} else {
|
} else {
|
||||||
buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity);
|
buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity);
|
||||||
}
|
}
|
||||||
cc.writeAndFlush(new SegmentedDatagramPacket(buffer, segmentSize, addr)).sync();
|
cc.writeAndFlush(new io.netty.channel.unix.SegmentedDatagramPacket(buffer, segmentSize, addr)).sync();
|
||||||
|
|
||||||
if (!latch.await(10, TimeUnit.SECONDS)) {
|
if (!latch.await(10, TimeUnit.SECONDS)) {
|
||||||
Throwable error = errorRef.get();
|
Throwable error = errorRef.get();
|
||||||
|
@ -0,0 +1,109 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2021 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel.unix;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.channel.socket.DatagramPacket;
|
||||||
|
import io.netty.util.internal.ObjectUtil;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allows to use <a href="https://blog.cloudflare.com/accelerating-udp-packet-transmission-for-quic/">GSO</a>
|
||||||
|
* if the underlying OS supports it. Before using this you should ensure your system support it.
|
||||||
|
*/
|
||||||
|
public class SegmentedDatagramPacket extends DatagramPacket {
|
||||||
|
|
||||||
|
private final int segmentSize;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new instance.
|
||||||
|
*
|
||||||
|
* @param data the {@link ByteBuf} which must be continguous.
|
||||||
|
* @param segmentSize the segment size.
|
||||||
|
* @param recipient the recipient.
|
||||||
|
*/
|
||||||
|
public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) {
|
||||||
|
super(data, recipient);
|
||||||
|
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new instance.
|
||||||
|
*
|
||||||
|
* @param data the {@link ByteBuf} which must be continguous.
|
||||||
|
* @param segmentSize the segment size.
|
||||||
|
* @param recipient the recipient.
|
||||||
|
*/
|
||||||
|
public SegmentedDatagramPacket(ByteBuf data, int segmentSize,
|
||||||
|
InetSocketAddress recipient, InetSocketAddress sender) {
|
||||||
|
super(data, recipient, sender);
|
||||||
|
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the size of each segment (the last segment can be smaller).
|
||||||
|
*
|
||||||
|
* @return size of segments.
|
||||||
|
*/
|
||||||
|
public int segmentSize() {
|
||||||
|
return segmentSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SegmentedDatagramPacket copy() {
|
||||||
|
return new SegmentedDatagramPacket(content().copy(), segmentSize, recipient(), sender());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SegmentedDatagramPacket duplicate() {
|
||||||
|
return new SegmentedDatagramPacket(content().duplicate(), segmentSize, recipient(), sender());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SegmentedDatagramPacket retainedDuplicate() {
|
||||||
|
return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize, recipient(), sender());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SegmentedDatagramPacket replace(ByteBuf content) {
|
||||||
|
return new SegmentedDatagramPacket(content, segmentSize, recipient(), sender());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SegmentedDatagramPacket retain() {
|
||||||
|
super.retain();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SegmentedDatagramPacket retain(int increment) {
|
||||||
|
super.retain(increment);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SegmentedDatagramPacket touch() {
|
||||||
|
super.touch();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SegmentedDatagramPacket touch(Object hint) {
|
||||||
|
super.touch(hint);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user