Ensure ChannelOptions are applied in the same order as configured in *Bootstrap (#9998)
Motivation: https://github.com/netty/netty/pull/9458 changed how we handled ChannelOptions internally to use a ConcurrentHashMap. This unfortunally had the side-effect that the ordering may be affected and not stable anymore. Here the problem is that sometimes we do validation based on two different ChannelOptions (for example we validate high and low watermarks against each other). Thus even if the user specified the options in the same order we may fail to configure them. Modifications: - Use again a LinkedHashMap to preserve order - Add unit test Result: Apply ChannelOptions in correct and expected order
This commit is contained in:
parent
4db1bdacb9
commit
e7e999373f
@ -36,6 +36,7 @@ import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@ -55,8 +56,12 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
|
||||
volatile EventLoopGroup group;
|
||||
private volatile SocketAddress localAddress;
|
||||
private final Map<ChannelOption<?>, Object> options = new ConcurrentHashMap<>();
|
||||
|
||||
// The order in which ChannelOptions are applied is important they may depend on each other for validation
|
||||
// purposes.
|
||||
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<>();
|
||||
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<>();
|
||||
|
||||
private volatile ChannelHandler handler;
|
||||
|
||||
AbstractBootstrap() {
|
||||
@ -67,7 +72,9 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
group = bootstrap.group;
|
||||
handler = bootstrap.handler;
|
||||
localAddress = bootstrap.localAddress;
|
||||
options.putAll(bootstrap.options);
|
||||
synchronized (bootstrap.options) {
|
||||
options.putAll(bootstrap.options);
|
||||
}
|
||||
attrs.putAll(bootstrap.attrs);
|
||||
}
|
||||
|
||||
@ -124,10 +131,12 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
*/
|
||||
public <T> B option(ChannelOption<T> option, T value) {
|
||||
requireNonNull(option, "option");
|
||||
if (value == null) {
|
||||
options.remove(option);
|
||||
} else {
|
||||
options.put(option, value);
|
||||
synchronized (options) {
|
||||
if (value == null) {
|
||||
options.remove(option);
|
||||
} else {
|
||||
options.put(option, value);
|
||||
}
|
||||
}
|
||||
return self();
|
||||
}
|
||||
@ -311,6 +320,12 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
*/
|
||||
public abstract AbstractBootstrapConfig<B, C, F> config();
|
||||
|
||||
final Map.Entry<ChannelOption<?>, Object>[] newOptionsArray() {
|
||||
synchronized (options) {
|
||||
return options.entrySet().toArray(EMPTY_OPTION_ARRAY);
|
||||
}
|
||||
}
|
||||
|
||||
final Map<ChannelOption<?>, Object> options0() {
|
||||
return options;
|
||||
}
|
||||
@ -328,7 +343,9 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
}
|
||||
|
||||
final Map<ChannelOption<?>, Object> options() {
|
||||
return copiedMap(options);
|
||||
synchronized (options) {
|
||||
return copiedMap(options);
|
||||
}
|
||||
}
|
||||
|
||||
final Map<AttributeKey<?>, Object> attrs() {
|
||||
|
@ -277,7 +277,7 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel, ChannelFact
|
||||
ChannelPipeline p = channel.pipeline();
|
||||
p.addLast(config.handler());
|
||||
|
||||
setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger);
|
||||
setChannelOptions(channel, newOptionsArray(), logger);
|
||||
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
|
||||
|
||||
return promise.setSuccess();
|
||||
|
@ -36,6 +36,7 @@ import io.netty.util.AttributeKey;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -50,7 +51,9 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
|
||||
|
||||
private final Map<ChannelOption<?>, Object> childOptions = new ConcurrentHashMap<>();
|
||||
// The order in which child ChannelOptions are applied is important they may depend on each other for validation
|
||||
// purposes.
|
||||
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<>();
|
||||
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<>();
|
||||
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
|
||||
private volatile EventLoopGroup childGroup;
|
||||
@ -64,7 +67,9 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
childGroup = bootstrap.childGroup;
|
||||
childHandler = bootstrap.childHandler;
|
||||
channelFactory = bootstrap.channelFactory;
|
||||
childOptions.putAll(bootstrap.childOptions);
|
||||
synchronized (bootstrap.childOptions) {
|
||||
childOptions.putAll(bootstrap.childOptions);
|
||||
}
|
||||
childAttrs.putAll(bootstrap.childAttrs);
|
||||
}
|
||||
|
||||
@ -98,10 +103,12 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
*/
|
||||
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
|
||||
requireNonNull(childOption, "childOption");
|
||||
if (value == null) {
|
||||
childOptions.remove(childOption);
|
||||
} else {
|
||||
childOptions.put(childOption, value);
|
||||
synchronized (childOptions) {
|
||||
if (value == null) {
|
||||
childOptions.remove(childOption);
|
||||
} else {
|
||||
childOptions.put(childOption, value);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
@ -159,14 +166,16 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
@Override
|
||||
ChannelFuture init(Channel channel) {
|
||||
final ChannelPromise promise = channel.newPromise();
|
||||
setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger);
|
||||
setChannelOptions(channel, newOptionsArray(), logger);
|
||||
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
|
||||
|
||||
ChannelPipeline p = channel.pipeline();
|
||||
|
||||
final ChannelHandler currentChildHandler = childHandler;
|
||||
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
|
||||
childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
|
||||
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
|
||||
synchronized (childOptions) {
|
||||
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
|
||||
}
|
||||
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
|
||||
|
||||
p.addLast(new ChannelInitializer<Channel>() {
|
||||
@ -308,7 +317,9 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
}
|
||||
|
||||
final Map<ChannelOption<?>, Object> childOptions() {
|
||||
return copiedMap(childOptions);
|
||||
synchronized (childOptions) {
|
||||
return copiedMap(childOptions);
|
||||
}
|
||||
}
|
||||
|
||||
final Map<AttributeKey<?>, Object> childAttrs() {
|
||||
|
@ -16,12 +16,19 @@
|
||||
|
||||
package io.netty.bootstrap;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelFactory;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.local.LocalAddress;
|
||||
@ -56,6 +63,7 @@ import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class BootstrapTest {
|
||||
@ -283,6 +291,56 @@ public class BootstrapTest {
|
||||
assertThat(connectFuture.channel(), is(not(nullValue())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChannelOptionOrderPreserve() throws InterruptedException {
|
||||
final BlockingQueue<ChannelOption<?>> options = new LinkedBlockingQueue<>();
|
||||
class ChannelConfigValidator extends DefaultChannelConfig {
|
||||
ChannelConfigValidator(Channel channel) {
|
||||
super(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> boolean setOption(ChannelOption<T> option, T value) {
|
||||
options.add(option);
|
||||
return super.setOption(option, value);
|
||||
}
|
||||
}
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final Bootstrap bootstrap = new Bootstrap()
|
||||
.handler(new ChannelInitializer() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) {
|
||||
latch.countDown();
|
||||
}
|
||||
})
|
||||
.group(groupA)
|
||||
.channelFactory(new ChannelFactory<Channel>() {
|
||||
@Override
|
||||
public Channel newChannel(EventLoop eventLoop) {
|
||||
return new LocalChannel(eventLoop) {
|
||||
private ChannelConfigValidator config;
|
||||
@Override
|
||||
public synchronized ChannelConfig config() {
|
||||
if (config == null) {
|
||||
config = new ChannelConfigValidator(this);
|
||||
}
|
||||
return config;
|
||||
}
|
||||
};
|
||||
}
|
||||
})
|
||||
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 1)
|
||||
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2);
|
||||
|
||||
bootstrap.register().syncUninterruptibly();
|
||||
|
||||
latch.await();
|
||||
|
||||
// Check the order is the same as what we defined before.
|
||||
assertSame(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, options.take());
|
||||
assertSame(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, options.take());
|
||||
}
|
||||
|
||||
private static final class LateRegisterHandler implements ChannelHandler {
|
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
Loading…
x
Reference in New Issue
Block a user