Continue reading when the number of bytes is less then the configured… (#11089)
... number of bytes when using DatagramChannels Motivation: In our FixedRecvByteBufAllocator we dont continue to read if the number of bytes is less then what was configured. This is correct when using it for TCP but not when using it for UDP. When using UDP the number of bytes is the maximum of what we want to support but we often end up processing smaller datagrams in general. Because of this we should use contineReading(UncheckedBooleanSupplier) to determite if we should continue reading Modifications: - use contineReading(UncheckedBooleanSupplier) for DatagramChannels Result: Read more then once in the general case for DatagramChannels with the default config
This commit is contained in:
parent
817052d019
commit
163b2b659c
@ -37,6 +37,7 @@ import io.netty.channel.unix.Errors.NativeIoException;
|
|||||||
import io.netty.channel.unix.Socket;
|
import io.netty.channel.unix.Socket;
|
||||||
import io.netty.channel.unix.UnixChannelUtil;
|
import io.netty.channel.unix.UnixChannelUtil;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
import io.netty.util.UncheckedBooleanSupplier;
|
||||||
import io.netty.util.internal.RecyclableArrayList;
|
import io.netty.util.internal.RecyclableArrayList;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
@ -498,7 +499,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} while (allocHandle.continueReading());
|
// We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
|
||||||
|
// as we read anything).
|
||||||
|
} while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
exception = t;
|
exception = t;
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ import io.netty.channel.unix.DatagramSocketAddress;
|
|||||||
import io.netty.channel.unix.Errors;
|
import io.netty.channel.unix.Errors;
|
||||||
import io.netty.channel.unix.IovArray;
|
import io.netty.channel.unix.IovArray;
|
||||||
import io.netty.channel.unix.UnixChannelUtil;
|
import io.netty.channel.unix.UnixChannelUtil;
|
||||||
|
import io.netty.util.UncheckedBooleanSupplier;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
import io.netty.util.internal.UnstableApi;
|
import io.netty.util.internal.UnstableApi;
|
||||||
|
|
||||||
@ -472,7 +473,10 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
|
|||||||
pipeline.fireChannelRead(packet);
|
pipeline.fireChannelRead(packet);
|
||||||
|
|
||||||
byteBuf = null;
|
byteBuf = null;
|
||||||
} while (allocHandle.continueReading());
|
|
||||||
|
// We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
|
||||||
|
// as we read anything).
|
||||||
|
} while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
if (byteBuf != null) {
|
if (byteBuf != null) {
|
||||||
byteBuf.release();
|
byteBuf.release();
|
||||||
|
@ -56,6 +56,10 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
|
|||||||
super.doBeginRead();
|
super.doBeginRead();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
|
||||||
|
return allocHandle.continueReading();
|
||||||
|
}
|
||||||
|
|
||||||
private final class NioMessageUnsafe extends AbstractNioUnsafe {
|
private final class NioMessageUnsafe extends AbstractNioUnsafe {
|
||||||
|
|
||||||
private final List<Object> readBuf = new ArrayList<>();
|
private final List<Object> readBuf = new ArrayList<>();
|
||||||
@ -83,7 +87,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
allocHandle.incMessagesRead(localRead);
|
allocHandle.incMessagesRead(localRead);
|
||||||
} while (allocHandle.continueReading());
|
} while (continueReading(allocHandle));
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
exception = t;
|
exception = t;
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,7 @@ import io.netty.channel.nio.AbstractNioMessageChannel;
|
|||||||
import io.netty.channel.socket.DatagramChannelConfig;
|
import io.netty.channel.socket.DatagramChannelConfig;
|
||||||
import io.netty.channel.socket.DatagramPacket;
|
import io.netty.channel.socket.DatagramPacket;
|
||||||
import io.netty.channel.socket.InternetProtocolFamily;
|
import io.netty.channel.socket.InternetProtocolFamily;
|
||||||
|
import io.netty.util.UncheckedBooleanSupplier;
|
||||||
import io.netty.util.internal.SocketUtils;
|
import io.netty.util.internal.SocketUtils;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
@ -579,4 +580,15 @@ public final class NioDatagramChannel
|
|||||||
}
|
}
|
||||||
return super.closeOnReadError(cause);
|
return super.closeOnReadError(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
|
||||||
|
if (allocHandle instanceof RecvByteBufAllocator.ExtendedHandle) {
|
||||||
|
// We use the TRUE_SUPPLIER as it is also ok to read less then what we did try to read (as long
|
||||||
|
// as we read anything).
|
||||||
|
return ((RecvByteBufAllocator.ExtendedHandle) allocHandle)
|
||||||
|
.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER);
|
||||||
|
}
|
||||||
|
return allocHandle.continueReading();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user