Replace synchronized usage with ConcurrentHashMap in *Bootstrap classes (#9458)
Motivation: In AbstractBoostrap, options and attrs are LinkedHashMap that are synchronized on for every read, copy/clone, write operation. When a lot of connections are triggered concurrently on the same bootstrap instance, the synchronized blocks lead to contention, Netty IO threads get blocked, and performance may be severely degraded. Modifications: Use ConcurrentHashMap Result: Less contention. Fixes https://github.com/netty/netty/issues/9426
This commit is contained in:
parent
ad7c554f5a
commit
089c6daff2
@ -35,8 +35,9 @@ import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
|
||||
@ -50,8 +51,8 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
|
||||
volatile EventLoopGroup group;
|
||||
private volatile SocketAddress localAddress;
|
||||
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<>();
|
||||
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<>();
|
||||
private final Map<ChannelOption<?>, Object> options = new ConcurrentHashMap<>();
|
||||
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<>();
|
||||
private volatile ChannelHandler handler;
|
||||
|
||||
AbstractBootstrap() {
|
||||
@ -62,12 +63,8 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
group = bootstrap.group;
|
||||
handler = bootstrap.handler;
|
||||
localAddress = bootstrap.localAddress;
|
||||
synchronized (bootstrap.options) {
|
||||
options.putAll(bootstrap.options);
|
||||
}
|
||||
synchronized (bootstrap.attrs) {
|
||||
attrs.putAll(bootstrap.attrs);
|
||||
}
|
||||
options.putAll(bootstrap.options);
|
||||
attrs.putAll(bootstrap.attrs);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -124,13 +121,9 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
public <T> B option(ChannelOption<T> option, T value) {
|
||||
requireNonNull(option, "option");
|
||||
if (value == null) {
|
||||
synchronized (options) {
|
||||
options.remove(option);
|
||||
}
|
||||
options.remove(option);
|
||||
} else {
|
||||
synchronized (options) {
|
||||
options.put(option, value);
|
||||
}
|
||||
options.put(option, value);
|
||||
}
|
||||
return self();
|
||||
}
|
||||
@ -142,13 +135,9 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
public <T> B attr(AttributeKey<T> key, T value) {
|
||||
requireNonNull(key, "key");
|
||||
if (value == null) {
|
||||
synchronized (attrs) {
|
||||
attrs.remove(key);
|
||||
}
|
||||
attrs.remove(key);
|
||||
} else {
|
||||
synchronized (attrs) {
|
||||
attrs.put(key, value);
|
||||
}
|
||||
attrs.put(key, value);
|
||||
}
|
||||
return self();
|
||||
}
|
||||
@ -318,17 +307,6 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
*/
|
||||
public abstract AbstractBootstrapConfig<B, C, F> config();
|
||||
|
||||
static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
|
||||
final Map<K, V> copied;
|
||||
synchronized (map) {
|
||||
if (map.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
copied = new LinkedHashMap<>(map);
|
||||
}
|
||||
return Collections.unmodifiableMap(copied);
|
||||
}
|
||||
|
||||
final Map<ChannelOption<?>, Object> options0() {
|
||||
return options;
|
||||
}
|
||||
@ -353,10 +331,18 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
return copiedMap(attrs);
|
||||
}
|
||||
|
||||
static void setChannelOptions(
|
||||
Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {
|
||||
for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
|
||||
setChannelOption(channel, e.getKey(), e.getValue(), logger);
|
||||
static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
|
||||
if (map.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
return Collections.unmodifiableMap(new HashMap<>(map));
|
||||
}
|
||||
|
||||
static void setAttributes(Channel channel, Map.Entry<AttributeKey<?>, Object>[] attrs) {
|
||||
for (Map.Entry<AttributeKey<?>, Object> e: attrs) {
|
||||
@SuppressWarnings("unchecked")
|
||||
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
|
||||
channel.attr(key).set(e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@ -367,6 +353,16 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static Map.Entry<AttributeKey<?>, Object>[] newAttrArray(int size) {
|
||||
return new Map.Entry[size];
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static Map.Entry<ChannelOption<?>, Object>[] newOptionArray(int size) {
|
||||
return new Map.Entry[size];
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void setChannelOption(
|
||||
Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
|
||||
|
@ -21,7 +21,6 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFactory;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
@ -31,7 +30,6 @@ import io.netty.resolver.AddressResolver;
|
||||
import io.netty.resolver.DefaultAddressResolverGroup;
|
||||
import io.netty.resolver.NameResolver;
|
||||
import io.netty.resolver.AddressResolverGroup;
|
||||
import io.netty.util.AttributeKey;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
@ -40,8 +38,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
/**
|
||||
* A {@link Bootstrap} that makes it easy to bootstrap a {@link Channel} to use
|
||||
@ -276,23 +272,14 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel, ChannelFact
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
ChannelFuture init(Channel channel) {
|
||||
ChannelPromise promise = channel.newPromise();
|
||||
ChannelPipeline p = channel.pipeline();
|
||||
p.addLast(config.handler());
|
||||
|
||||
final Map<ChannelOption<?>, Object> options = options0();
|
||||
synchronized (options) {
|
||||
setChannelOptions(channel, options, logger);
|
||||
}
|
||||
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
|
||||
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
|
||||
|
||||
final Map<AttributeKey<?>, Object> attrs = attrs0();
|
||||
synchronized (attrs) {
|
||||
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
|
||||
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
|
||||
}
|
||||
}
|
||||
return promise.setSuccess();
|
||||
}
|
||||
|
||||
|
@ -37,9 +37,9 @@ import io.netty.util.AttributeKey;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
@ -51,8 +51,8 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
|
||||
|
||||
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<>();
|
||||
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<>();
|
||||
private final Map<ChannelOption<?>, Object> childOptions = new ConcurrentHashMap<>();
|
||||
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<>();
|
||||
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
|
||||
private volatile EventLoopGroup childGroup;
|
||||
private volatile ChannelHandler childHandler;
|
||||
@ -65,12 +65,8 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
childGroup = bootstrap.childGroup;
|
||||
childHandler = bootstrap.childHandler;
|
||||
channelFactory = bootstrap.channelFactory;
|
||||
synchronized (bootstrap.childOptions) {
|
||||
childOptions.putAll(bootstrap.childOptions);
|
||||
}
|
||||
synchronized (bootstrap.childAttrs) {
|
||||
childAttrs.putAll(bootstrap.childAttrs);
|
||||
}
|
||||
childOptions.putAll(bootstrap.childOptions);
|
||||
childAttrs.putAll(bootstrap.childAttrs);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -104,13 +100,9 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
|
||||
requireNonNull(childOption, "childOption");
|
||||
if (value == null) {
|
||||
synchronized (childOptions) {
|
||||
childOptions.remove(childOption);
|
||||
}
|
||||
childOptions.remove(childOption);
|
||||
} else {
|
||||
synchronized (childOptions) {
|
||||
childOptions.put(childOption, value);
|
||||
}
|
||||
childOptions.put(childOption, value);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
@ -168,36 +160,19 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
@Override
|
||||
ChannelFuture init(Channel channel) {
|
||||
final ChannelPromise promise = channel.newPromise();
|
||||
|
||||
final Map<ChannelOption<?>, Object> options = options0();
|
||||
synchronized (options) {
|
||||
setChannelOptions(channel, options, logger);
|
||||
}
|
||||
|
||||
final Map<AttributeKey<?>, Object> attrs = attrs0();
|
||||
synchronized (attrs) {
|
||||
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
|
||||
@SuppressWarnings("unchecked")
|
||||
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
|
||||
channel.attr(key).set(e.getValue());
|
||||
}
|
||||
}
|
||||
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
|
||||
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
|
||||
|
||||
ChannelPipeline p = channel.pipeline();
|
||||
|
||||
final ChannelHandler currentChildHandler = childHandler;
|
||||
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
|
||||
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
|
||||
synchronized (childOptions) {
|
||||
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
|
||||
}
|
||||
synchronized (childAttrs) {
|
||||
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
|
||||
}
|
||||
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
|
||||
childOptions.entrySet().toArray(newOptionArray(0));
|
||||
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
|
||||
|
||||
p.addLast(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(final Channel ch) throws Exception {
|
||||
public void initChannel(final Channel ch) {
|
||||
final ChannelPipeline pipeline = ch.pipeline();
|
||||
ChannelHandler handler = config.handler();
|
||||
if (handler != null) {
|
||||
@ -235,16 +210,6 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Entry<AttributeKey<?>, Object>[] newAttrArray(int size) {
|
||||
return new Entry[size];
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Map.Entry<ChannelOption<?>, Object>[] newOptionArray(int size) {
|
||||
return new Map.Entry[size];
|
||||
}
|
||||
|
||||
private static class ServerBootstrapAcceptor implements ChannelInboundHandler {
|
||||
|
||||
private final ChannelHandler childHandler;
|
||||
@ -284,17 +249,13 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void initChild(final Channel child) {
|
||||
assert child.eventLoop().inEventLoop();
|
||||
try {
|
||||
child.pipeline().addLast(childHandler);
|
||||
|
||||
setChannelOptions(child, childOptions, logger);
|
||||
|
||||
for (Entry<AttributeKey<?>, Object> e : childAttrs) {
|
||||
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
|
||||
}
|
||||
setAttributes(child, childAttrs);
|
||||
|
||||
child.register().addListener((ChannelFutureListener) future -> {
|
||||
if (!future.isSuccess()) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user