DefaultChannelConfig maxMessagesPerRead default not always set
Motivation: ChannelMetadata has a field minMaxMessagesPerRead which can be confusing. There are also some cases where static instances are used and the default value for channel type is not being applied. Modifications: - use a default value which is set unconditionally to simplify - make sure static instances of MaxMessagesRecvByteBufAllocator are not used if the intention is that the default maxMessagesPerRead should be derived from the channel type. Result: Less confusing interfaces in ChannelMetadata and ChannelConfig. Default maxMessagesPerRead is correctly applied.
This commit is contained in:
parent
8cd259896e
commit
641505a5d2
@ -495,6 +495,7 @@ public class NoPriorityByteDistributionBenchmark extends AbstractMicrobenchmark
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static class TestChannel extends AbstractChannel {
|
private static class TestChannel extends AbstractChannel {
|
||||||
|
private static final ChannelMetadata TEST_METADATA = new ChannelMetadata(false);
|
||||||
private DefaultChannelConfig config = new DefaultChannelConfig(this);
|
private DefaultChannelConfig config = new DefaultChannelConfig(this);
|
||||||
|
|
||||||
private class TestUnsafe extends AbstractUnsafe {
|
private class TestUnsafe extends AbstractUnsafe {
|
||||||
@ -536,7 +537,7 @@ public class NoPriorityByteDistributionBenchmark extends AbstractMicrobenchmark
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelMetadata metadata() {
|
public ChannelMetadata metadata() {
|
||||||
return null;
|
return TEST_METADATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -55,6 +55,10 @@ public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufA
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
|
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
|
||||||
|
|
||||||
private static int getSizeTableIndex(final int size) {
|
private static int getSizeTableIndex(final int size) {
|
||||||
@ -132,7 +136,7 @@ public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufA
|
|||||||
* parameters, the expected buffer size starts from {@code 1024}, does not
|
* parameters, the expected buffer size starts from {@code 1024}, does not
|
||||||
* go down below {@code 64}, and does not go up above {@code 65536}.
|
* go down below {@code 64}, and does not go up above {@code 65536}.
|
||||||
*/
|
*/
|
||||||
private AdaptiveRecvByteBufAllocator() {
|
public AdaptiveRecvByteBufAllocator() {
|
||||||
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
|
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@ import java.net.SocketAddress;
|
|||||||
public final class ChannelMetadata {
|
public final class ChannelMetadata {
|
||||||
|
|
||||||
private final boolean hasDisconnect;
|
private final boolean hasDisconnect;
|
||||||
private final int minMaxMessagesPerRead;
|
private final int defaultMaxMessagesPerRead;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new instance
|
* Create a new instance
|
||||||
@ -42,15 +42,16 @@ public final class ChannelMetadata {
|
|||||||
* @param hasDisconnect {@code true} if and only if the channel has the {@code disconnect()} operation
|
* @param hasDisconnect {@code true} if and only if the channel has the {@code disconnect()} operation
|
||||||
* that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)}
|
* that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)}
|
||||||
* again, such as UDP/IP.
|
* again, such as UDP/IP.
|
||||||
* @param minMaxMessagesPerRead If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this is the minimum
|
* @param defaultMaxMessagesPerRead If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this value will be
|
||||||
* value enforced for {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. Must be {@code > 0}.
|
* set for {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. Must be {@code > 0}.
|
||||||
*/
|
*/
|
||||||
public ChannelMetadata(boolean hasDisconnect, int minMaxMessagesPerRead) {
|
public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) {
|
||||||
if (minMaxMessagesPerRead <= 0) {
|
if (defaultMaxMessagesPerRead <= 0) {
|
||||||
throw new IllegalArgumentException("minMaxMessagesPerRead: " + minMaxMessagesPerRead + " (expected > 0)");
|
throw new IllegalArgumentException("defaultMaxMessagesPerRead: " + defaultMaxMessagesPerRead +
|
||||||
|
" (expected > 0)");
|
||||||
}
|
}
|
||||||
this.hasDisconnect = hasDisconnect;
|
this.hasDisconnect = hasDisconnect;
|
||||||
this.minMaxMessagesPerRead = minMaxMessagesPerRead;
|
this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -63,10 +64,10 @@ public final class ChannelMetadata {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this is the minimum value enforced for
|
* If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this is the default value for
|
||||||
* {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}.
|
* {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}.
|
||||||
*/
|
*/
|
||||||
public int minMaxMessagesPerRead() {
|
public int defaultMaxMessagesPerRead() {
|
||||||
return minMaxMessagesPerRead;
|
return defaultMaxMessagesPerRead;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,13 +34,12 @@ import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
|
|||||||
import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
|
import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
|
||||||
import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
|
import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK;
|
||||||
import static io.netty.channel.ChannelOption.WRITE_SPIN_COUNT;
|
import static io.netty.channel.ChannelOption.WRITE_SPIN_COUNT;
|
||||||
|
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default {@link SocketChannelConfig} implementation.
|
* The default {@link SocketChannelConfig} implementation.
|
||||||
*/
|
*/
|
||||||
public class DefaultChannelConfig implements ChannelConfig {
|
public class DefaultChannelConfig implements ChannelConfig {
|
||||||
|
|
||||||
private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = AdaptiveRecvByteBufAllocator.DEFAULT;
|
|
||||||
private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
|
private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
|
||||||
|
|
||||||
private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
|
private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
|
||||||
@ -59,7 +58,7 @@ public class DefaultChannelConfig implements ChannelConfig {
|
|||||||
protected final Channel channel;
|
protected final Channel channel;
|
||||||
|
|
||||||
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
|
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
|
||||||
private volatile RecvByteBufAllocator rcvBufAllocator = DEFAULT_RCVBUF_ALLOCATOR;
|
private volatile RecvByteBufAllocator rcvBufAllocator;
|
||||||
private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
|
private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
|
||||||
|
|
||||||
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
|
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
|
||||||
@ -71,9 +70,11 @@ public class DefaultChannelConfig implements ChannelConfig {
|
|||||||
private volatile int writeBufferLowWaterMark = 32 * 1024;
|
private volatile int writeBufferLowWaterMark = 32 * 1024;
|
||||||
|
|
||||||
public DefaultChannelConfig(Channel channel) {
|
public DefaultChannelConfig(Channel channel) {
|
||||||
if (channel == null) {
|
this(channel, new AdaptiveRecvByteBufAllocator());
|
||||||
throw new NullPointerException("channel");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
|
||||||
|
setRecvByteBufAllocator(allocator, channel.metadata());
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -281,40 +282,26 @@ public class DefaultChannelConfig implements ChannelConfig {
|
|||||||
return (T) rcvBufAllocator;
|
return (T) rcvBufAllocator;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
* <p>
|
|
||||||
* This method enforces the {@link ChannelMetadata#minMaxMessagesPerRead()}. If these predetermined limits
|
|
||||||
* are not appropriate for your use case consider extending the channel and overriding {@link Channel#metadata()},
|
|
||||||
* or use {@link #setRecvByteBufAllocator(RecvByteBufAllocator, ChannelMetadata)}.
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
|
public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
|
||||||
return setRecvByteBufAllocator(allocator, channel.metadata());
|
rcvBufAllocator = checkNotNull(allocator, "allocator");
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
|
* Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
|
||||||
* @param allocator the allocator to set.
|
* @param allocator the allocator to set.
|
||||||
* @param metadata Used to determine the {@link ChannelMetadata#minMaxMessagesPerRead()} if {@code allocator}
|
* @param metadata Used to set the {@link ChannelMetadata#defaultMaxMessagesPerRead()} if {@code allocator}
|
||||||
* is of type {@link MaxMessagesRecvByteBufAllocator}.
|
* is of type {@link MaxMessagesRecvByteBufAllocator}.
|
||||||
* @return this
|
* @return this
|
||||||
*/
|
*/
|
||||||
public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
|
private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
|
||||||
if (allocator == null) {
|
if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
|
||||||
|
((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
|
||||||
|
} else if (allocator == null) {
|
||||||
throw new NullPointerException("allocator");
|
throw new NullPointerException("allocator");
|
||||||
}
|
}
|
||||||
if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
|
|
||||||
if (metadata == null) {
|
|
||||||
throw new NullPointerException("metadata");
|
|
||||||
}
|
|
||||||
MaxMessagesRecvByteBufAllocator maxMsgAllocator = (MaxMessagesRecvByteBufAllocator) allocator;
|
|
||||||
if (maxMsgAllocator.maxMessagesPerRead() < metadata.minMaxMessagesPerRead()) {
|
|
||||||
maxMsgAllocator.maxMessagesPerRead(metadata.minMaxMessagesPerRead());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rcvBufAllocator = allocator;
|
rcvBufAllocator = allocator;
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -43,8 +43,6 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
|
|||||||
|
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultDatagramChannelConfig.class);
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultDatagramChannelConfig.class);
|
||||||
|
|
||||||
private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048);
|
|
||||||
|
|
||||||
private final DatagramSocket javaSocket;
|
private final DatagramSocket javaSocket;
|
||||||
private volatile boolean activeOnOpen;
|
private volatile boolean activeOnOpen;
|
||||||
|
|
||||||
@ -52,12 +50,11 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
|
|||||||
* Creates a new instance.
|
* Creates a new instance.
|
||||||
*/
|
*/
|
||||||
public DefaultDatagramChannelConfig(DatagramChannel channel, DatagramSocket javaSocket) {
|
public DefaultDatagramChannelConfig(DatagramChannel channel, DatagramSocket javaSocket) {
|
||||||
super(channel);
|
super(channel, new FixedRecvByteBufAllocator(2048));
|
||||||
if (javaSocket == null) {
|
if (javaSocket == null) {
|
||||||
throw new NullPointerException("javaSocket");
|
throw new NullPointerException("javaSocket");
|
||||||
}
|
}
|
||||||
this.javaSocket = javaSocket;
|
this.javaSocket = javaSocket;
|
||||||
setRecvByteBufAllocator(DEFAULT_RCVBUF_ALLOCATOR);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -99,7 +99,7 @@ public class AbstractChannelTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static class TestChannel extends AbstractChannel {
|
private static class TestChannel extends AbstractChannel {
|
||||||
|
private static final ChannelMetadata TEST_METADATA = new ChannelMetadata(false);
|
||||||
private class TestUnsafe extends AbstractUnsafe {
|
private class TestUnsafe extends AbstractUnsafe {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -127,7 +127,7 @@ public class AbstractChannelTest {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelMetadata metadata() {
|
public ChannelMetadata metadata() {
|
||||||
return null;
|
return TEST_METADATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -127,6 +127,7 @@ public class ChannelOutboundBufferTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static final class TestChannel extends AbstractChannel {
|
private static final class TestChannel extends AbstractChannel {
|
||||||
|
private static final ChannelMetadata TEST_METADATA = new ChannelMetadata(false);
|
||||||
private final ChannelConfig config = new DefaultChannelConfig(this);
|
private final ChannelConfig config = new DefaultChannelConfig(this);
|
||||||
|
|
||||||
TestChannel() {
|
TestChannel() {
|
||||||
@ -195,7 +196,7 @@ public class ChannelOutboundBufferTest {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelMetadata metadata() {
|
public ChannelMetadata metadata() {
|
||||||
throw new UnsupportedOperationException();
|
return TEST_METADATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
final class TestUnsafe extends AbstractUnsafe {
|
final class TestUnsafe extends AbstractUnsafe {
|
||||||
|
Loading…
Reference in New Issue
Block a user