[#2485] Use RecvByteBufAllocator for all allocations related to read from Channel

Motivation:
At the moment we sometimes use only RecvByteBufAllocator.guess() to guess the next size and the use the ByteBufAllocator.* directly to allocate the buffer. We should always use RecvByteBufAllocator.allocate(...) all the time as this makes the behavior easier to adjust.

Modifications:
Change the read() implementations to make use of RecvByteBufAllocator.

Result:
Behavior is more consistent.
This commit is contained in:
Norman Maurer 2014-05-10 15:21:25 +02:00
parent 69c72220aa
commit 59d92ad6cf
3 changed files with 24 additions and 6 deletions

View File

@ -597,12 +597,11 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
boolean close = false; boolean close = false;
try { try {
int byteBufCapacity = allocHandle.guess();
int totalReadAmount = 0; int totalReadAmount = 0;
for (;;) { for (;;) {
// we use a direct buffer here as the native implementations only be able // we use a direct buffer here as the native implementations only be able
// to handle direct buffers. // to handle direct buffers.
byteBuf = allocator.directBuffer(byteBufCapacity); byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes(); int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf); int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) { if (localReadAmount <= 0) {

View File

@ -106,11 +106,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
int messages = 0; int messages = 0;
boolean close = false; boolean close = false;
try { try {
int byteBufCapacity = allocHandle.guess();
int totalReadAmount = 0; int totalReadAmount = 0;
boolean readPendingReset = false; boolean readPendingReset = false;
do { do {
byteBuf = allocator.ioBuffer(byteBufCapacity); byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes(); int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf); int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) { if (localReadAmount <= 0) {

View File

@ -23,6 +23,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
@ -32,6 +33,7 @@ import java.io.IOException;
* Abstract base class for OIO which reads and writes bytes from/to a Socket * Abstract base class for OIO which reads and writes bytes from/to a Socket
*/ */
public abstract class AbstractOioByteChannel extends AbstractOioChannel { public abstract class AbstractOioByteChannel extends AbstractOioChannel {
private RecvByteBufAllocator.Handle allocHandle;
private volatile boolean inputShutdown; private volatile boolean inputShutdown;
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false);
@ -76,13 +78,20 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
final ChannelConfig config = config(); final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
// TODO: calculate size as in 3.x RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
ByteBuf byteBuf = alloc().buffer(); if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = allocHandle.allocate(alloc());
boolean closed = false; boolean closed = false;
boolean read = false; boolean read = false;
Throwable exception = null; Throwable exception = null;
int localReadAmount = 0; int localReadAmount = 0;
try { try {
int totalReadAmount = 0;
for (;;) { for (;;) {
localReadAmount = doReadBytes(byteBuf); localReadAmount = doReadBytes(byteBuf);
if (localReadAmount > 0) { if (localReadAmount > 0) {
@ -114,12 +123,23 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
} }
} }
} }
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
if (!config.isAutoRead()) { if (!config.isAutoRead()) {
// stop reading until next Channel.read() call // stop reading until next Channel.read() call
// See https://github.com/netty/netty/issues/1363 // See https://github.com/netty/netty/issues/1363
break; break;
} }
} }
allocHandle.record(totalReadAmount);
} catch (Throwable t) { } catch (Throwable t) {
exception = t; exception = t;
} finally { } finally {