From 6c741932aad04d79377c03e6ce9dd116a9baa45f Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 29 Mar 2021 13:55:43 +0200 Subject: [PATCH] Move SegmentedDatagramPacket to transport-native-unix-common (#11121) Motivation: As we can supported SegmentedDatagramPacket in multiple native transports (like in epoll and io_uring) we should just move it to unix-common so we can share code. Modification: - Move SegmentedDatagrampPacket to transport-native-unixu - Mark the SegmentedDatagramPacket in epoll as deprecated - Update code to use updated package. Result: Possibility of code re-use --- .../channel/epoll/EpollDatagramChannel.java | 26 +++-- .../epoll/NativeDatagramPacketArray.java | 4 +- .../epoll/SegmentedDatagramPacket.java | 33 ++---- .../epoll/EpollDatagramUnicastTest.java | 6 +- .../channel/unix/SegmentedDatagramPacket.java | 109 ++++++++++++++++++ 5 files changed, 142 insertions(+), 36 deletions(-) create mode 100644 transport-native-unix-common/src/main/java/io/netty/channel/unix/SegmentedDatagramPacket.java diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 7a27e0b8ea..87cf0e39d6 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -65,6 +65,17 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements private final EpollDatagramChannelConfig config; private volatile boolean connected; + /** + * Returns {@code true} if {@link io.netty.channel.unix.SegmentedDatagramPacket} is supported natively. + * + * @return {@code true} if supported, {@code false} otherwise. + */ + public static boolean isSegmentedDatagramPacketSupported() { + return Epoll.isAvailable() && + // We only support it together with sendmmsg(...) + Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT; + } + /** * Create a new instance which selects the {@link InternetProtocolFamily} to use depending * on the Operation Systems default which will be chosen. @@ -298,7 +309,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+ if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 || // We only handle UDP_SEGMENT in sendmmsg. - in.current() instanceof SegmentedDatagramPacket) { + in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) { NativeDatagramPacketArray array = cleanDatagramPacketArray(); array.add(in, isConnected(), maxMessagesPerWrite); int cnt = array.count(); @@ -376,12 +387,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override protected Object filterOutboundMessage(Object msg) { - if (msg instanceof SegmentedDatagramPacket) { + if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) { if (!Native.IS_SUPPORTING_UDP_SEGMENT) { throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); } - SegmentedDatagramPacket packet = (SegmentedDatagramPacket) msg; + io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg; ByteBuf content = packet.content(); return UnixChannelUtil.isBufferCopyNeededForWrite(content) ? packet.replace(newDirectBuffer(packet, content)) : msg; @@ -568,8 +579,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements private static void addDatagramPacketToOut(DatagramPacket packet, RecyclableArrayList out) { - if (packet instanceof SegmentedDatagramPacket) { - SegmentedDatagramPacket segmentedDatagramPacket = (SegmentedDatagramPacket) packet; + if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) { + io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket = + (io.netty.channel.unix.SegmentedDatagramPacket) packet; ByteBuf content = segmentedDatagramPacket.content(); InetSocketAddress recipient = segmentedDatagramPacket.recipient(); InetSocketAddress sender = segmentedDatagramPacket.sender(); @@ -635,7 +647,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements byteBuf.writerIndex(bytesReceived); InetSocketAddress local = localAddress(); DatagramPacket packet = msg.newDatagramPacket(byteBuf, local); - if (!(packet instanceof SegmentedDatagramPacket)) { + if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) { processPacket(pipeline(), allocHandle, bytesReceived, packet); byteBuf = null; } else { @@ -685,7 +697,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (received == 1) { // Single packet fast-path DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local); - if (!(packet instanceof SegmentedDatagramPacket)) { + if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) { processPacket(pipeline(), allocHandle, datagramSize, packet); byteBuf = null; return true; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java index f9062d68a7..602fc5f690 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java @@ -119,8 +119,8 @@ final class NativeDatagramPacketArray { DatagramPacket packet = (DatagramPacket) msg; ByteBuf buf = packet.content(); int segmentSize = 0; - if (packet instanceof SegmentedDatagramPacket) { - int seg = ((SegmentedDatagramPacket) packet).segmentSize(); + if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) { + int seg = ((io.netty.channel.unix.SegmentedDatagramPacket) packet).segmentSize(); // We only need to tell the kernel that we want to use UDP_SEGMENT if there are multiple // segments in the packet. if (buf.readableBytes() > seg) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/SegmentedDatagramPacket.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/SegmentedDatagramPacket.java index 83358c5b89..50bca1f997 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/SegmentedDatagramPacket.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/SegmentedDatagramPacket.java @@ -16,18 +16,14 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; -import io.netty.channel.socket.DatagramPacket; -import io.netty.util.internal.ObjectUtil; import java.net.InetSocketAddress; /** - * Allows to use GSO - * if the underlying OS supports it. Before instance and use this class you should check {@link #isSupported()}. + * @deprecated use {@link io.netty.channel.unix.SegmentedDatagramPacket}. */ -public final class SegmentedDatagramPacket extends DatagramPacket { - - private final int segmentSize; +@Deprecated +public final class SegmentedDatagramPacket extends io.netty.channel.unix.SegmentedDatagramPacket { /** * Create a new instance. @@ -37,9 +33,8 @@ public final class SegmentedDatagramPacket extends DatagramPacket { * @param recipient the recipient. */ public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) { - super(data, recipient); + super(data, segmentSize, recipient); checkIsSupported(); - this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize"); } /** @@ -51,9 +46,8 @@ public final class SegmentedDatagramPacket extends DatagramPacket { */ public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient, InetSocketAddress sender) { - super(data, recipient, sender); + super(data, segmentSize, recipient, sender); checkIsSupported(); - this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize"); } /** @@ -65,33 +59,24 @@ public final class SegmentedDatagramPacket extends DatagramPacket { Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT; } - /** - * Return the size of each segment (the last segment can be smaller). - * - * @return size of segments. - */ - public int segmentSize() { - return segmentSize; - } - @Override public SegmentedDatagramPacket copy() { - return new SegmentedDatagramPacket(content().copy(), segmentSize, recipient(), sender()); + return new SegmentedDatagramPacket(content().copy(), segmentSize(), recipient(), sender()); } @Override public SegmentedDatagramPacket duplicate() { - return new SegmentedDatagramPacket(content().duplicate(), segmentSize, recipient(), sender()); + return new SegmentedDatagramPacket(content().duplicate(), segmentSize(), recipient(), sender()); } @Override public SegmentedDatagramPacket retainedDuplicate() { - return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize, recipient(), sender()); + return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize(), recipient(), sender()); } @Override public SegmentedDatagramPacket replace(ByteBuf content) { - return new SegmentedDatagramPacket(content, segmentSize, recipient(), sender()); + return new SegmentedDatagramPacket(content, segmentSize(), recipient(), sender()); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java index 1a2565bb8b..c52b77eb16 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java @@ -99,14 +99,14 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest { // Only supported for the native epoll transport. return; } - Assume.assumeTrue(SegmentedDatagramPacket.isSupported()); + Assume.assumeTrue(EpollDatagramChannel.isSegmentedDatagramPacketSupported()); Channel sc = null; Channel cc = null; try { cb.handler(new SimpleChannelInboundHandler() { @Override - public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception { + public void channelRead0(ChannelHandlerContext ctx, Object msgs) { // Nothing will be sent. } }); @@ -148,7 +148,7 @@ public class EpollDatagramUnicastTest extends DatagramUnicastTest { } else { buffer = Unpooled.directBuffer(bufferCapacity).writeZero(bufferCapacity); } - cc.writeAndFlush(new SegmentedDatagramPacket(buffer, segmentSize, addr)).sync(); + cc.writeAndFlush(new io.netty.channel.unix.SegmentedDatagramPacket(buffer, segmentSize, addr)).sync(); if (!latch.await(10, TimeUnit.SECONDS)) { Throwable error = errorRef.get(); diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/SegmentedDatagramPacket.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/SegmentedDatagramPacket.java new file mode 100644 index 0000000000..01f21ce994 --- /dev/null +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/SegmentedDatagramPacket.java @@ -0,0 +1,109 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.unix; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.internal.ObjectUtil; + +import java.net.InetSocketAddress; + +/** + * Allows to use GSO + * if the underlying OS supports it. Before using this you should ensure your system support it. + */ +public class SegmentedDatagramPacket extends DatagramPacket { + + private final int segmentSize; + + /** + * Create a new instance. + * + * @param data the {@link ByteBuf} which must be continguous. + * @param segmentSize the segment size. + * @param recipient the recipient. + */ + public SegmentedDatagramPacket(ByteBuf data, int segmentSize, InetSocketAddress recipient) { + super(data, recipient); + this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize"); + } + + /** + * Create a new instance. + * + * @param data the {@link ByteBuf} which must be continguous. + * @param segmentSize the segment size. + * @param recipient the recipient. + */ + public SegmentedDatagramPacket(ByteBuf data, int segmentSize, + InetSocketAddress recipient, InetSocketAddress sender) { + super(data, recipient, sender); + this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize"); + } + + /** + * Return the size of each segment (the last segment can be smaller). + * + * @return size of segments. + */ + public int segmentSize() { + return segmentSize; + } + + @Override + public SegmentedDatagramPacket copy() { + return new SegmentedDatagramPacket(content().copy(), segmentSize, recipient(), sender()); + } + + @Override + public SegmentedDatagramPacket duplicate() { + return new SegmentedDatagramPacket(content().duplicate(), segmentSize, recipient(), sender()); + } + + @Override + public SegmentedDatagramPacket retainedDuplicate() { + return new SegmentedDatagramPacket(content().retainedDuplicate(), segmentSize, recipient(), sender()); + } + + @Override + public SegmentedDatagramPacket replace(ByteBuf content) { + return new SegmentedDatagramPacket(content, segmentSize, recipient(), sender()); + } + + @Override + public SegmentedDatagramPacket retain() { + super.retain(); + return this; + } + + @Override + public SegmentedDatagramPacket retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public SegmentedDatagramPacket touch() { + super.touch(); + return this; + } + + @Override + public SegmentedDatagramPacket touch(Object hint) { + super.touch(hint); + return this; + } +}