Make Bootstrap and ServerBootstrap thread-safe

- Additional fix for: #970
- Use LinkedHashMap again to save memory consumption
- ServerBootstrap now makes a copy of child parameters so that modifying ServerBootstrap after bind() does not affect the already-bound servers. This also makes child channel initialization potentially faster due to reduced garbage iterator.
This commit is contained in:
Trustin Lee 2013-01-31 11:34:28 +09:00
parent 604b359d9e
commit 152c969eab
3 changed files with 137 additions and 60 deletions

View File

@ -27,8 +27,8 @@ import io.netty.util.AttributeKey;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
@ -37,12 +37,12 @@ import java.util.concurrent.ConcurrentHashMap;
*/
abstract class AbstractBootstrap<B extends AbstractBootstrap<?, C>, C extends Channel> implements Cloneable {
private EventLoopGroup group;
private ChannelFactory<? extends C> channelFactory;
private SocketAddress localAddress;
private final Map<ChannelOption<?>, Object> options = new ConcurrentHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
private ChannelHandler handler;
private volatile EventLoopGroup group;
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile ChannelHandler handler;
AbstractBootstrap() {
// Disallow extending from a different package.
@ -53,8 +53,12 @@ abstract class AbstractBootstrap<B extends AbstractBootstrap<?, C>, C extends Ch
channelFactory = bootstrap.channelFactory;
handler = bootstrap.handler;
localAddress = bootstrap.localAddress;
options.putAll(bootstrap.options);
attrs.putAll(bootstrap.attrs);
synchronized (bootstrap.options) {
options.putAll(bootstrap.options);
}
synchronized (bootstrap.attrs) {
attrs.putAll(bootstrap.attrs);
}
}
/**
@ -146,9 +150,13 @@ abstract class AbstractBootstrap<B extends AbstractBootstrap<?, C>, C extends Ch
throw new NullPointerException("option");
}
if (value == null) {
options.remove(option);
synchronized (options) {
options.remove(option);
}
} else {
options.put(option, value);
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
@ -162,9 +170,13 @@ abstract class AbstractBootstrap<B extends AbstractBootstrap<?, C>, C extends Ch
throw new NullPointerException("key");
}
if (value == null) {
attrs.remove(key);
synchronized (attrs) {
attrs.remove(key);
}
} else {
attrs.put(key, value);
synchronized (attrs) {
attrs.put(key, value);
}
}
@SuppressWarnings("unchecked")
@ -307,15 +319,19 @@ abstract class AbstractBootstrap<B extends AbstractBootstrap<?, C>, C extends Ch
buf.append(localAddress);
buf.append(", ");
}
if (options != null && !options.isEmpty()) {
buf.append("options: ");
buf.append(options);
buf.append(", ");
synchronized (options) {
if (!options.isEmpty()) {
buf.append("options: ");
buf.append(options);
buf.append(", ");
}
}
if (attrs != null && !attrs.isEmpty()) {
buf.append("attrs: ");
buf.append(attrs);
buf.append(", ");
synchronized (attrs) {
if (!attrs.isEmpty()) {
buf.append("attrs: ");
buf.append(attrs);
buf.append(", ");
}
}
if (handler != null) {
buf.append("handler: ");

View File

@ -27,6 +27,7 @@ import io.netty.util.AttributeKey;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Map.Entry;
/**
@ -38,7 +39,7 @@ public final class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class);
private SocketAddress remoteAddress;
private volatile SocketAddress remoteAddress;
public Bootstrap() { }
@ -163,18 +164,24 @@ public final class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());
for (Entry<ChannelOption<?>, Object> e: options().entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
for (Entry<AttributeKey<?>, Object> e: attrs().entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
group().register(channel).syncUninterruptibly();

View File

@ -47,17 +47,10 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
private final ChannelHandler acceptor = new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new Acceptor());
}
};
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private EventLoopGroup childGroup;
private ChannelHandler childHandler;
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
public ServerBootstrap() { }
@ -65,8 +58,12 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
super(bootstrap);
childGroup = bootstrap.childGroup;
childHandler = bootstrap.childHandler;
childOptions.putAll(bootstrap.childOptions);
childAttrs.putAll(bootstrap.childAttrs);
synchronized (bootstrap.childOptions) {
childOptions.putAll(bootstrap.childOptions);
}
synchronized (bootstrap.childAttrs) {
childAttrs.putAll(bootstrap.childAttrs);
}
}
/**
@ -104,9 +101,13 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
throw new NullPointerException("childOption");
}
if (value == null) {
childOptions.remove(childOption);
synchronized (childOptions) {
childOptions.remove(childOption);
}
} else {
childOptions.put(childOption, value);
synchronized (childOptions) {
childOptions.put(childOption, value);
}
}
return this;
}
@ -143,23 +144,47 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
Channel channel = channelFactory().newChannel();
try {
channel.config().setOptions(options());
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
} catch (Exception e) {
channel.close();
return channel.newFailedFuture(e);
}
for (Entry<AttributeKey<?>, Object> e: attrs().entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
p.addLast(acceptor);
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
ChannelFuture f = group().register(channel).awaitUninterruptibly();
if (!f.isSuccess()) {
@ -189,9 +214,34 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
}
}
private class Acceptor
@SuppressWarnings("unchecked")
private static Entry<ChannelOption<?>, Object>[] newOptionArray(int size) {
return new Entry[size];
}
@SuppressWarnings("unchecked")
private static Entry<AttributeKey<?>, Object>[] newAttrArray(int size) {
return new Entry[size];
}
private static class ServerBootstrapAcceptor
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<Channel> {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
@SuppressWarnings("unchecked")
ServerBootstrapAcceptor(
EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
}
@Override
public MessageBuf<Channel> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
@ -209,7 +259,7 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions.entrySet()) {
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
@ -219,7 +269,7 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs.entrySet()) {
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
@ -249,15 +299,19 @@ public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, Se
buf.append(childGroup.getClass().getSimpleName());
buf.append(", ");
}
if (childOptions != null && !childOptions.isEmpty()) {
buf.append("childOptions: ");
buf.append(childOptions);
buf.append(", ");
synchronized (childOptions) {
if (!childOptions.isEmpty()) {
buf.append("childOptions: ");
buf.append(childOptions);
buf.append(", ");
}
}
if (childAttrs != null && !childAttrs.isEmpty()) {
buf.append("childAttrs: ");
buf.append(childAttrs);
buf.append(", ");
synchronized (childAttrs) {
if (!childAttrs.isEmpty()) {
buf.append("childAttrs: ");
buf.append(childAttrs);
buf.append(", ");
}
}
if (childHandler != null) {
buf.append("childHandler: ");