Add ChannelConfig.maxMessagesPerRead and ChannelOption.MAX_MESSAGES_PER_READ

- Fixes #1486
- Make sure AbstractNioMessageChannel.NioMessageUnsafe.read() only up to maxMessagesPerRead
This commit is contained in:
Trustin Lee 2013-06-25 17:49:28 +09:00
parent 58b968b603
commit a1632e7d15
24 changed files with 202 additions and 44 deletions

View File

@ -199,17 +199,26 @@ final class DefaultRxtxChannelConfig extends DefaultChannelConfig implements Rxt
@Override
public RxtxChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return (RxtxChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis);
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public RxtxChannelConfig setWriteSpinCount(int writeSpinCount) {
return (RxtxChannelConfig) super.setWriteSpinCount(writeSpinCount);
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public RxtxChannelConfig setAllocator(ByteBufAllocator allocator) {
return (RxtxChannelConfig) super.setAllocator(allocator);
super.setAllocator(allocator);
return this;
}
@Override
@ -220,16 +229,19 @@ final class DefaultRxtxChannelConfig extends DefaultChannelConfig implements Rxt
@Override
public RxtxChannelConfig setAutoRead(boolean autoRead) {
return (RxtxChannelConfig) super.setAutoRead(autoRead);
super.setAutoRead(autoRead);
return this;
}
@Override
public RxtxChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
return (RxtxChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public RxtxChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
return (RxtxChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
}

View File

@ -272,6 +272,9 @@ public interface RxtxChannelConfig extends ChannelConfig {
@Override
RxtxChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
RxtxChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
RxtxChannelConfig setWriteSpinCount(int writeSpinCount);

View File

@ -176,6 +176,12 @@ public class DefaultSctpChannelConfig extends DefaultChannelConfig implements Sc
return (SctpChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis);
}
@Override
public SctpChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public SctpChannelConfig setWriteSpinCount(int writeSpinCount) {
return (SctpChannelConfig) super.setWriteSpinCount(writeSpinCount);

View File

@ -155,19 +155,28 @@ public class DefaultSctpServerChannelConfig extends DefaultChannelConfig impleme
return this;
}
@Override
public SctpServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public SctpServerChannelConfig setWriteSpinCount(int writeSpinCount) {
return (SctpServerChannelConfig) super.setWriteSpinCount(writeSpinCount);
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public SctpServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return (SctpServerChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis);
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public SctpServerChannelConfig setAllocator(ByteBufAllocator allocator) {
return (SctpServerChannelConfig) super.setAllocator(allocator);
super.setAllocator(allocator);
return this;
}
@Override
@ -178,16 +187,19 @@ public class DefaultSctpServerChannelConfig extends DefaultChannelConfig impleme
@Override
public SctpServerChannelConfig setAutoRead(boolean autoRead) {
return (SctpServerChannelConfig) super.setAutoRead(autoRead);
super.setAutoRead(autoRead);
return this;
}
@Override
public SctpServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
return (SctpServerChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public SctpServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
return (SctpServerChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
}

View File

@ -99,6 +99,9 @@ public interface SctpChannelConfig extends ChannelConfig {
@Override
SctpChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
SctpChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
SctpChannelConfig setWriteSpinCount(int writeSpinCount);

View File

@ -91,6 +91,9 @@ public interface SctpServerChannelConfig extends ChannelConfig {
*/
SctpServerChannelConfig setInitMaxStreams(InitMaxStreams initMaxStreams);
@Override
SctpServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
SctpServerChannelConfig setWriteSpinCount(int writeSpinCount);

View File

@ -238,6 +238,12 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements
return this;
}
@Override
public UdtChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public UdtChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);

View File

@ -141,6 +141,12 @@ public class DefaultUdtServerChannelConfig extends DefaultUdtChannelConfig
return this;
}
@Override
public UdtServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public UdtServerChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);

View File

@ -114,6 +114,9 @@ public interface UdtChannelConfig extends ChannelConfig {
@Override
UdtChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
UdtChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
UdtChannelConfig setWriteSpinCount(int writeSpinCount);

View File

@ -48,6 +48,9 @@ public interface UdtServerChannelConfig extends UdtChannelConfig {
@Override
UdtServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
UdtServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
UdtServerChannelConfig setWriteSpinCount(int writeSpinCount);

View File

@ -118,6 +118,22 @@ public interface ChannelConfig {
*/
ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
/**
* Returns the maximum number of messages in a {@link MessageList} of
* a {@link ChannelInboundHandler#messageReceived(ChannelHandlerContext, MessageList) messageReceived()} event.
* If this value is greater than 1, an event loop might attempt to read multiple times to fill multiple messages
* into the {@link MessageList}.
*/
int getMaxMessagesPerRead();
/**
* Sets the maximum number of messages in a {@link MessageList} of
* a {@link ChannelInboundHandler#messageReceived(ChannelHandlerContext, MessageList) messageReceived()} event.
* If this value is greater than 1, an event loop might attempt to read multiple times to fill multiple messages
* into the {@link MessageList}.
*/
ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.

View File

@ -42,6 +42,8 @@ public class ChannelOption<T> extends UniqueName {
public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS =
new ChannelOption<Integer>("CONNECT_TIMEOUT_MILLIS");
public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ =
new ChannelOption<Integer>("MAX_MESSAGES_PER_READ");
public static final ChannelOption<Integer> WRITE_SPIN_COUNT =
new ChannelOption<Integer>("WRITE_SPIN_COUNT");
public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK =

View File

@ -39,6 +39,7 @@ public class DefaultChannelConfig implements ChannelConfig {
private volatile ByteBufAllocator allocator = DEFAULT_ALLOCATOR;
private volatile RecvByteBufAllocator rcvBufAllocator = DEFAULT_RCVBUF_ALLOCATOR;
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
private volatile int maxMessagesPerRead = 16;
private volatile int writeSpinCount = 16;
private volatile boolean autoRead = true;
private volatile int writeBufferHighWaterMark = 64 * 1024;
@ -53,7 +54,10 @@ public class DefaultChannelConfig implements ChannelConfig {
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(null, CONNECT_TIMEOUT_MILLIS, WRITE_SPIN_COUNT, ALLOCATOR, AUTO_READ, RCVBUF_ALLOCATOR);
return getOptions(
null,
CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
ALLOCATOR, AUTO_READ, RCVBUF_ALLOCATOR);
}
protected Map<ChannelOption<?>, Object> getOptions(
@ -94,6 +98,9 @@ public class DefaultChannelConfig implements ChannelConfig {
if (option == CONNECT_TIMEOUT_MILLIS) {
return (T) Integer.valueOf(getConnectTimeoutMillis());
}
if (option == MAX_MESSAGES_PER_READ) {
return (T) Integer.valueOf(getMaxMessagesPerRead());
}
if (option == WRITE_SPIN_COUNT) {
return (T) Integer.valueOf(getWriteSpinCount());
}
@ -116,6 +123,8 @@ public class DefaultChannelConfig implements ChannelConfig {
if (option == CONNECT_TIMEOUT_MILLIS) {
setConnectTimeoutMillis((Integer) value);
} else if (option == MAX_MESSAGES_PER_READ) {
setMaxMessagesPerRead((Integer) value);
} else if (option == WRITE_SPIN_COUNT) {
setWriteSpinCount((Integer) value);
} else if (option == ALLOCATOR) {
@ -153,6 +162,20 @@ public class DefaultChannelConfig implements ChannelConfig {
return this;
}
@Override
public int getMaxMessagesPerRead() {
return maxMessagesPerRead;
}
@Override
public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
if (maxMessagesPerRead <= 0) {
throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)");
}
this.maxMessagesPerRead = maxMessagesPerRead;
return this;
}
@Override
public int getWriteSpinCount() {
return writeSpinCount;

View File

@ -16,6 +16,7 @@
package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageList;
@ -54,29 +55,32 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
}
}
final ChannelConfig config = config();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final boolean autoRead = config.isAutoRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
MessageList<Object> msgBuf = MessageList.newInstance();
Throwable exception = null;
loop: for (;;) {
try {
for (;;) {
int localRead = doReadMessages(msgBuf);
if (localRead == 0) {
break loop;
}
if (localRead < 0) {
closed = true;
break loop;
}
if (!config().isAutoRead()) {
break loop;
}
int readMessages = 0;
try {
for (;;) {
int localRead = doReadMessages(msgBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
readMessages += localRead;
if (readMessages >= maxMessagesPerRead | !autoRead) {
break;
}
} catch (Throwable t) {
exception = t;
break;
}
} catch (Throwable t) {
exception = t;
}
pipeline.fireMessageReceived(msgBuf);
@ -86,7 +90,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
closed = true;
}
pipeline().fireExceptionCaught(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {

View File

@ -155,6 +155,9 @@ public interface DatagramChannelConfig extends ChannelConfig {
*/
DatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface);
@Override
DatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
DatagramChannelConfig setWriteSpinCount(int writeSpinCount);

View File

@ -349,17 +349,26 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
@Override
public DatagramChannelConfig setWriteSpinCount(int writeSpinCount) {
return (DatagramChannelConfig) super.setWriteSpinCount(writeSpinCount);
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public DatagramChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return (DatagramChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis);
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public DatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public DatagramChannelConfig setAllocator(ByteBufAllocator allocator) {
return (DatagramChannelConfig) super.setAllocator(allocator);
super.setAllocator(allocator);
return this;
}
@Override
@ -370,16 +379,19 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
@Override
public DatagramChannelConfig setAutoRead(boolean autoRead) {
return (DatagramChannelConfig) super.setAutoRead(autoRead);
super.setAutoRead(autoRead);
return this;
}
@Override
public DatagramChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
return (DatagramChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public DatagramChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
return (DatagramChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
}

View File

@ -146,17 +146,26 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
@Override
public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return (ServerSocketChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis);
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
return (ServerSocketChannelConfig) super.setWriteSpinCount(writeSpinCount);
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
return (ServerSocketChannelConfig) super.setAllocator(allocator);
super.setAllocator(allocator);
return this;
}
@Override
@ -167,16 +176,19 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
@Override
public ServerSocketChannelConfig setAutoRead(boolean autoRead) {
return (ServerSocketChannelConfig) super.setAutoRead(autoRead);
super.setAutoRead(autoRead);
return this;
}
@Override
public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
return (ServerSocketChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
return (ServerSocketChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
}

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
@ -284,6 +283,12 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
return (SocketChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis);
}
@Override
public SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public SocketChannelConfig setWriteSpinCount(int writeSpinCount) {
return (SocketChannelConfig) super.setWriteSpinCount(writeSpinCount);

View File

@ -86,6 +86,9 @@ public interface ServerSocketChannelConfig extends ChannelConfig {
@Override
ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount);

View File

@ -159,6 +159,9 @@ public interface SocketChannelConfig extends ChannelConfig {
@Override
SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
SocketChannelConfig setWriteSpinCount(int writeSpinCount);

View File

@ -114,6 +114,12 @@ public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChan
return this;
}
@Override
public OioServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public OioServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);

View File

@ -142,6 +142,12 @@ public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig im
return this;
}
@Override
public OioSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public OioSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);

View File

@ -65,6 +65,9 @@ public interface OioServerSocketChannelConfig extends ServerSocketChannelConfig
@Override
OioServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
OioServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
OioServerSocketChannelConfig setWriteSpinCount(int writeSpinCount);

View File

@ -80,6 +80,9 @@ public interface OioSocketChannelConfig extends SocketChannelConfig {
@Override
OioSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);
@Override
OioSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);
@Override
OioSocketChannelConfig setWriteSpinCount(int writeSpinCount);