Make NioByteUnsafe.read() respect ChannelConfig.maxMessagesPerRead and adjust the default from 16 to 1
- Fixes #1486 - Decreased the default from 16 to 1 because unnecessary extra read on req-res protocols results in lower throughput due to extra syscalls.
This commit is contained in:
parent
a1632e7d15
commit
1f27c3b390
@ -39,7 +39,7 @@ public class DefaultChannelConfig implements ChannelConfig {
|
|||||||
private volatile ByteBufAllocator allocator = DEFAULT_ALLOCATOR;
|
private volatile ByteBufAllocator allocator = DEFAULT_ALLOCATOR;
|
||||||
private volatile RecvByteBufAllocator rcvBufAllocator = DEFAULT_RCVBUF_ALLOCATOR;
|
private volatile RecvByteBufAllocator rcvBufAllocator = DEFAULT_RCVBUF_ALLOCATOR;
|
||||||
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
|
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
|
||||||
private volatile int maxMessagesPerRead = 16;
|
private volatile int maxMessagesPerRead = 1;
|
||||||
private volatile int writeSpinCount = 16;
|
private volatile int writeSpinCount = 16;
|
||||||
private volatile boolean autoRead = true;
|
private volatile boolean autoRead = true;
|
||||||
private volatile int writeBufferHighWaterMark = 64 * 1024;
|
private volatile int writeBufferHighWaterMark = 64 * 1024;
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
package io.netty.channel.nio;
|
package io.netty.channel.nio;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelConfig;
|
import io.netty.channel.ChannelConfig;
|
||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
@ -75,25 +76,49 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
|||||||
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
|
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteBuf byteBuf = allocHandle.allocate(config.getAllocator());
|
final ByteBufAllocator allocator = config.getAllocator();
|
||||||
|
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
|
||||||
|
final MessageList<ByteBuf> messages = MessageList.newInstance();
|
||||||
|
|
||||||
boolean closed = false;
|
boolean closed = false;
|
||||||
Throwable exception = null;
|
Throwable exception = null;
|
||||||
|
ByteBuf byteBuf = null;
|
||||||
try {
|
try {
|
||||||
int localReadAmount = doReadBytes(byteBuf);
|
for (;;) {
|
||||||
if (localReadAmount < 0) {
|
byteBuf = allocHandle.allocate(allocator);
|
||||||
closed = true;
|
int localReadAmount = doReadBytes(byteBuf);
|
||||||
|
if (localReadAmount == 0) {
|
||||||
|
byteBuf.release();
|
||||||
|
byteBuf = null;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (localReadAmount < 0) {
|
||||||
|
closed = true;
|
||||||
|
byteBuf.release();
|
||||||
|
byteBuf = null;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
messages.add(byteBuf);
|
||||||
|
allocHandle.record(localReadAmount);
|
||||||
|
byteBuf = null;
|
||||||
|
if (messages.size() == maxMessagesPerRead) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
exception = t;
|
exception = t;
|
||||||
} finally {
|
} finally {
|
||||||
int readBytes = byteBuf.readableBytes();
|
if (byteBuf != null) {
|
||||||
allocHandle.record(readBytes);
|
if (byteBuf.isReadable()) {
|
||||||
if (readBytes != 0) {
|
messages.add(byteBuf);
|
||||||
pipeline.fireMessageReceived(byteBuf);
|
} else {
|
||||||
} else {
|
byteBuf.release();
|
||||||
byteBuf.release();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pipeline.fireMessageReceived(messages);
|
||||||
|
|
||||||
if (exception != null) {
|
if (exception != null) {
|
||||||
if (exception instanceof IOException) {
|
if (exception instanceof IOException) {
|
||||||
closed = true;
|
closed = true;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user