[#2485] Use RecvByteBufAllocator for all allocations related to read from Channel
Motivation: At the moment we sometimes use only RecvByteBufAllocator.guess() to guess the next size and the use the ByteBufAllocator.* directly to allocate the buffer. We should always use RecvByteBufAllocator.allocate(...) all the time as this makes the behavior easier to adjust. Modifications: Change the read() implementations to make use of RecvByteBufAllocator. Result: Behavior is more consistent.
This commit is contained in:
parent
137080c595
commit
dd0782990b
@ -553,12 +553,11 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
ByteBuf byteBuf = null;
|
||||
boolean close = false;
|
||||
try {
|
||||
int byteBufCapacity = allocHandle.guess();
|
||||
int totalReadAmount = 0;
|
||||
for (;;) {
|
||||
// we use a direct buffer here as the native implementations only be able
|
||||
// to handle direct buffers.
|
||||
byteBuf = allocator.directBuffer(byteBufCapacity);
|
||||
byteBuf = allocHandle.allocate(allocator);
|
||||
int writable = byteBuf.writableBytes();
|
||||
int localReadAmount = doReadBytes(byteBuf);
|
||||
if (localReadAmount <= 0) {
|
||||
|
@ -106,11 +106,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
int messages = 0;
|
||||
boolean close = false;
|
||||
try {
|
||||
int byteBufCapacity = allocHandle.guess();
|
||||
int totalReadAmount = 0;
|
||||
boolean readPendingReset = false;
|
||||
do {
|
||||
byteBuf = allocator.ioBuffer(byteBufCapacity);
|
||||
byteBuf = allocHandle.allocate(allocator);
|
||||
int writable = byteBuf.writableBytes();
|
||||
int localReadAmount = doReadBytes(byteBuf);
|
||||
if (localReadAmount <= 0) {
|
||||
|
@ -23,6 +23,7 @@ import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.FileRegion;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
@ -32,6 +33,7 @@ import java.io.IOException;
|
||||
* Abstract base class for OIO which reads and writes bytes from/to a Socket
|
||||
*/
|
||||
public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
||||
private RecvByteBufAllocator.Handle allocHandle;
|
||||
|
||||
private volatile boolean inputShutdown;
|
||||
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
|
||||
@ -76,13 +78,20 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
||||
final ChannelConfig config = config();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
|
||||
// TODO: calculate size as in 3.x
|
||||
ByteBuf byteBuf = alloc().buffer();
|
||||
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
|
||||
if (allocHandle == null) {
|
||||
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
|
||||
}
|
||||
|
||||
ByteBuf byteBuf = allocHandle.allocate(alloc());
|
||||
|
||||
boolean closed = false;
|
||||
boolean read = false;
|
||||
Throwable exception = null;
|
||||
int localReadAmount = 0;
|
||||
try {
|
||||
int totalReadAmount = 0;
|
||||
|
||||
for (;;) {
|
||||
localReadAmount = doReadBytes(byteBuf);
|
||||
if (localReadAmount > 0) {
|
||||
@ -114,12 +123,23 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
|
||||
// Avoid overflow.
|
||||
totalReadAmount = Integer.MAX_VALUE;
|
||||
break;
|
||||
}
|
||||
|
||||
totalReadAmount += localReadAmount;
|
||||
|
||||
if (!config.isAutoRead()) {
|
||||
// stop reading until next Channel.read() call
|
||||
// See https://github.com/netty/netty/issues/1363
|
||||
break;
|
||||
}
|
||||
}
|
||||
allocHandle.record(totalReadAmount);
|
||||
|
||||
} catch (Throwable t) {
|
||||
exception = t;
|
||||
} finally {
|
||||
|
Loading…
Reference in New Issue
Block a user