Replace synchronized usage with ConcurrentHashMap in *Bootstrap classes (#9458)
Motivation: In AbstractBoostrap, options and attrs are LinkedHashMap that are synchronized on for every read, copy/clone, write operation. When a lot of connections are triggered concurrently on the same bootstrap instance, the synchronized blocks lead to contention, Netty IO threads get blocked, and performance may be severely degraded. Modifications: Use ConcurrentHashMap Result: Less contention. Fixes https://github.com/netty/netty/issues/9426
This commit is contained in:
parent
10eb2cd2e6
commit
97361fa2c8
@ -38,8 +38,9 @@ import java.net.InetAddress;
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
|
* {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
|
||||||
@ -54,8 +55,8 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private volatile ChannelFactory<? extends C> channelFactory;
|
private volatile ChannelFactory<? extends C> channelFactory;
|
||||||
private volatile SocketAddress localAddress;
|
private volatile SocketAddress localAddress;
|
||||||
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
|
private final Map<ChannelOption<?>, Object> options = new ConcurrentHashMap<ChannelOption<?>, Object>();
|
||||||
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
|
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
|
||||||
private volatile ChannelHandler handler;
|
private volatile ChannelHandler handler;
|
||||||
|
|
||||||
AbstractBootstrap() {
|
AbstractBootstrap() {
|
||||||
@ -67,13 +68,9 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
channelFactory = bootstrap.channelFactory;
|
channelFactory = bootstrap.channelFactory;
|
||||||
handler = bootstrap.handler;
|
handler = bootstrap.handler;
|
||||||
localAddress = bootstrap.localAddress;
|
localAddress = bootstrap.localAddress;
|
||||||
synchronized (bootstrap.options) {
|
|
||||||
options.putAll(bootstrap.options);
|
options.putAll(bootstrap.options);
|
||||||
}
|
|
||||||
synchronized (bootstrap.attrs) {
|
|
||||||
attrs.putAll(bootstrap.attrs);
|
attrs.putAll(bootstrap.attrs);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
|
* The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
|
||||||
@ -166,14 +163,10 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
public <T> B option(ChannelOption<T> option, T value) {
|
public <T> B option(ChannelOption<T> option, T value) {
|
||||||
ObjectUtil.checkNotNull(option, "option");
|
ObjectUtil.checkNotNull(option, "option");
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
synchronized (options) {
|
|
||||||
options.remove(option);
|
options.remove(option);
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
synchronized (options) {
|
|
||||||
options.put(option, value);
|
options.put(option, value);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return self();
|
return self();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -184,14 +177,10 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
public <T> B attr(AttributeKey<T> key, T value) {
|
public <T> B attr(AttributeKey<T> key, T value) {
|
||||||
ObjectUtil.checkNotNull(key, "key");
|
ObjectUtil.checkNotNull(key, "key");
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
synchronized (attrs) {
|
|
||||||
attrs.remove(key);
|
attrs.remove(key);
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
synchronized (attrs) {
|
|
||||||
attrs.put(key, value);
|
attrs.put(key, value);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return self();
|
return self();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -384,17 +373,6 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
*/
|
*/
|
||||||
public abstract AbstractBootstrapConfig<B, C> config();
|
public abstract AbstractBootstrapConfig<B, C> config();
|
||||||
|
|
||||||
static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
|
|
||||||
final Map<K, V> copied;
|
|
||||||
synchronized (map) {
|
|
||||||
if (map.isEmpty()) {
|
|
||||||
return Collections.emptyMap();
|
|
||||||
}
|
|
||||||
copied = new LinkedHashMap<K, V>(map);
|
|
||||||
}
|
|
||||||
return Collections.unmodifiableMap(copied);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<ChannelOption<?>, Object> options0() {
|
final Map<ChannelOption<?>, Object> options0() {
|
||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
@ -424,10 +402,18 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
return copiedMap(attrs);
|
return copiedMap(attrs);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setChannelOptions(
|
static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
|
||||||
Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {
|
if (map.isEmpty()) {
|
||||||
for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
|
return Collections.emptyMap();
|
||||||
setChannelOption(channel, e.getKey(), e.getValue(), logger);
|
}
|
||||||
|
return Collections.unmodifiableMap(new HashMap<K, V>(map));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setAttributes(Channel channel, Map.Entry<AttributeKey<?>, Object>[] attrs) {
|
||||||
|
for (Map.Entry<AttributeKey<?>, Object> e: attrs) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
|
||||||
|
channel.attr(key).set(e.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -438,6 +424,16 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
static Map.Entry<AttributeKey<?>, Object>[] newAttrArray(int size) {
|
||||||
|
return new Map.Entry[size];
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
static Map.Entry<ChannelOption<?>, Object>[] newOptionArray(int size) {
|
||||||
|
return new Map.Entry[size];
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private static void setChannelOption(
|
private static void setChannelOption(
|
||||||
Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
|
Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
|
||||||
|
@ -18,7 +18,6 @@ package io.netty.bootstrap;
|
|||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.ChannelOption;
|
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
@ -27,7 +26,6 @@ import io.netty.resolver.AddressResolver;
|
|||||||
import io.netty.resolver.DefaultAddressResolverGroup;
|
import io.netty.resolver.DefaultAddressResolverGroup;
|
||||||
import io.netty.resolver.NameResolver;
|
import io.netty.resolver.NameResolver;
|
||||||
import io.netty.resolver.AddressResolverGroup;
|
import io.netty.resolver.AddressResolverGroup;
|
||||||
import io.netty.util.AttributeKey;
|
|
||||||
import io.netty.util.concurrent.Future;
|
import io.netty.util.concurrent.Future;
|
||||||
import io.netty.util.concurrent.FutureListener;
|
import io.netty.util.concurrent.FutureListener;
|
||||||
import io.netty.util.internal.ObjectUtil;
|
import io.netty.util.internal.ObjectUtil;
|
||||||
@ -256,21 +254,12 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
void init(Channel channel) throws Exception {
|
void init(Channel channel) {
|
||||||
ChannelPipeline p = channel.pipeline();
|
ChannelPipeline p = channel.pipeline();
|
||||||
p.addLast(config.handler());
|
p.addLast(config.handler());
|
||||||
|
|
||||||
final Map<ChannelOption<?>, Object> options = options0();
|
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
|
||||||
synchronized (options) {
|
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
|
||||||
setChannelOptions(channel, options, logger);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<AttributeKey<?>, Object> attrs = attrs0();
|
|
||||||
synchronized (attrs) {
|
|
||||||
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
|
|
||||||
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -32,9 +32,9 @@ import io.netty.util.internal.ObjectUtil;
|
|||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -45,8 +45,8 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
|||||||
|
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
|
||||||
|
|
||||||
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
|
private final Map<ChannelOption<?>, Object> childOptions = new ConcurrentHashMap<ChannelOption<?>, Object>();
|
||||||
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
|
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
|
||||||
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
|
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
|
||||||
private volatile EventLoopGroup childGroup;
|
private volatile EventLoopGroup childGroup;
|
||||||
private volatile ChannelHandler childHandler;
|
private volatile ChannelHandler childHandler;
|
||||||
@ -57,13 +57,9 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
|||||||
super(bootstrap);
|
super(bootstrap);
|
||||||
childGroup = bootstrap.childGroup;
|
childGroup = bootstrap.childGroup;
|
||||||
childHandler = bootstrap.childHandler;
|
childHandler = bootstrap.childHandler;
|
||||||
synchronized (bootstrap.childOptions) {
|
|
||||||
childOptions.putAll(bootstrap.childOptions);
|
childOptions.putAll(bootstrap.childOptions);
|
||||||
}
|
|
||||||
synchronized (bootstrap.childAttrs) {
|
|
||||||
childAttrs.putAll(bootstrap.childAttrs);
|
childAttrs.putAll(bootstrap.childAttrs);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client).
|
* Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client).
|
||||||
@ -96,14 +92,10 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
|||||||
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
|
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
|
||||||
ObjectUtil.checkNotNull(childOption, "childOption");
|
ObjectUtil.checkNotNull(childOption, "childOption");
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
synchronized (childOptions) {
|
|
||||||
childOptions.remove(childOption);
|
childOptions.remove(childOption);
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
synchronized (childOptions) {
|
|
||||||
childOptions.put(childOption, value);
|
childOptions.put(childOption, value);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,37 +122,21 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void init(Channel channel) throws Exception {
|
void init(Channel channel) {
|
||||||
final Map<ChannelOption<?>, Object> options = options0();
|
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
|
||||||
synchronized (options) {
|
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
|
||||||
setChannelOptions(channel, options, logger);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<AttributeKey<?>, Object> attrs = attrs0();
|
|
||||||
synchronized (attrs) {
|
|
||||||
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
|
|
||||||
channel.attr(key).set(e.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ChannelPipeline p = channel.pipeline();
|
ChannelPipeline p = channel.pipeline();
|
||||||
|
|
||||||
final EventLoopGroup currentChildGroup = childGroup;
|
final EventLoopGroup currentChildGroup = childGroup;
|
||||||
final ChannelHandler currentChildHandler = childHandler;
|
final ChannelHandler currentChildHandler = childHandler;
|
||||||
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
|
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
|
||||||
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
|
childOptions.entrySet().toArray(newOptionArray(0));
|
||||||
synchronized (childOptions) {
|
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
|
||||||
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
|
|
||||||
}
|
|
||||||
synchronized (childAttrs) {
|
|
||||||
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
p.addLast(new ChannelInitializer<Channel>() {
|
p.addLast(new ChannelInitializer<Channel>() {
|
||||||
@Override
|
@Override
|
||||||
public void initChannel(final Channel ch) throws Exception {
|
public void initChannel(final Channel ch) {
|
||||||
final ChannelPipeline pipeline = ch.pipeline();
|
final ChannelPipeline pipeline = ch.pipeline();
|
||||||
ChannelHandler handler = config.handler();
|
ChannelHandler handler = config.handler();
|
||||||
if (handler != null) {
|
if (handler != null) {
|
||||||
@ -191,16 +167,6 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private static Entry<AttributeKey<?>, Object>[] newAttrArray(int size) {
|
|
||||||
return new Entry[size];
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private static Map.Entry<ChannelOption<?>, Object>[] newOptionArray(int size) {
|
|
||||||
return new Map.Entry[size];
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
|
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
private final EventLoopGroup childGroup;
|
private final EventLoopGroup childGroup;
|
||||||
@ -238,10 +204,7 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
|||||||
child.pipeline().addLast(childHandler);
|
child.pipeline().addLast(childHandler);
|
||||||
|
|
||||||
setChannelOptions(child, childOptions, logger);
|
setChannelOptions(child, childOptions, logger);
|
||||||
|
setAttributes(child, childAttrs);
|
||||||
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
|
|
||||||
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
childGroup.register(child).addListener(new ChannelFutureListener() {
|
childGroup.register(child).addListener(new ChannelFutureListener() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user