From 163b2b659c628a7e9a91ae08146f7a76ddad8e19 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 17 Mar 2021 13:10:57 +0100 Subject: [PATCH] =?UTF-8?q?Continue=20reading=20when=20the=20number=20of?= =?UTF-8?q?=20bytes=20is=20less=20then=20the=20configured=E2=80=A6=20(#110?= =?UTF-8?q?89)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ... number of bytes when using DatagramChannels Motivation: In our FixedRecvByteBufAllocator we dont continue to read if the number of bytes is less then what was configured. This is correct when using it for TCP but not when using it for UDP. When using UDP the number of bytes is the maximum of what we want to support but we often end up processing smaller datagrams in general. Because of this we should use contineReading(UncheckedBooleanSupplier) to determite if we should continue reading Modifications: - use contineReading(UncheckedBooleanSupplier) for DatagramChannels Result: Read more then once in the general case for DatagramChannels with the default config --- .../io/netty/channel/epoll/EpollDatagramChannel.java | 5 ++++- .../netty/channel/kqueue/KQueueDatagramChannel.java | 6 +++++- .../netty/channel/nio/AbstractNioMessageChannel.java | 6 +++++- .../netty/channel/socket/nio/NioDatagramChannel.java | 12 ++++++++++++ 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index ecd714fed3..e8d3fe79c9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -37,6 +37,7 @@ import io.netty.channel.unix.Errors.NativeIoException; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.ReferenceCountUtil; +import io.netty.util.UncheckedBooleanSupplier; import io.netty.util.internal.RecyclableArrayList; import io.netty.util.internal.StringUtil; @@ -498,7 +499,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } else { break; } - } while (allocHandle.continueReading()); + // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long + // as we read anything). + } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER)); } catch (Throwable t) { exception = t; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java index 6efc29b10e..b6757a7c41 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java @@ -33,6 +33,7 @@ import io.netty.channel.unix.DatagramSocketAddress; import io.netty.channel.unix.Errors; import io.netty.channel.unix.IovArray; import io.netty.channel.unix.UnixChannelUtil; +import io.netty.util.UncheckedBooleanSupplier; import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; @@ -472,7 +473,10 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement pipeline.fireChannelRead(packet); byteBuf = null; - } while (allocHandle.continueReading()); + + // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long + // as we read anything). + } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER)); } catch (Throwable t) { if (byteBuf != null) { byteBuf.release(); diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 0e57dc46a1..987c576f5f 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -56,6 +56,10 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { super.doBeginRead(); } + protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) { + return allocHandle.continueReading(); + } + private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List readBuf = new ArrayList<>(); @@ -83,7 +87,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { } allocHandle.incMessagesRead(localRead); - } while (allocHandle.continueReading()); + } while (continueReading(allocHandle)); } catch (Throwable t) { exception = t; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index f7848bbf54..2c8f1a48f9 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -32,6 +32,7 @@ import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; +import io.netty.util.UncheckedBooleanSupplier; import io.netty.util.internal.SocketUtils; import io.netty.util.internal.StringUtil; @@ -579,4 +580,15 @@ public final class NioDatagramChannel } return super.closeOnReadError(cause); } + + @Override + protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) { + if (allocHandle instanceof RecvByteBufAllocator.ExtendedHandle) { + // We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long + // as we read anything). + return ((RecvByteBufAllocator.ExtendedHandle) allocHandle) + .continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER); + } + return allocHandle.continueReading(); + } }