Fix the dead lock described in #1175

- Similar to @normanmaurer's fix in that this commit also makes Bootstrap.init(Channel) asynchronous, but it is simpler and less invasive.
- Also made sure a connection attempt failure in the local transport does not trigger an exceptionCaught event
This commit is contained in:
Trustin Lee 2013-03-21 19:19:14 +09:00
parent bd8d53eaed
commit c08919d0a0
4 changed files with 96 additions and 43 deletions

View File

@ -20,6 +20,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -78,15 +79,9 @@ public final class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
@Override
ChannelFuture doBind(SocketAddress localAddress) {
Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
return channel.newFailedFuture(t);
ChannelPromise initPromise = init(channel);
if (initPromise.cause() != null) {
return initPromise;
}
return channel.bind(localAddress).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
@ -147,12 +142,9 @@ public final class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
*/
private ChannelFuture doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.close();
return channel.newFailedFuture(t);
ChannelPromise initPromise = init(channel);
if (initPromise.cause() != null) {
return initPromise;
}
final ChannelFuture future;
@ -166,31 +158,55 @@ public final class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
}
@SuppressWarnings("unchecked")
private void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());
private ChannelPromise init(Channel channel) {
ChannelPromise promise = channel.newPromise();
try {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
group().register(channel, promise);
} catch (Throwable t) {
promise.setFailure(t);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
if (promise.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
group().register(channel).syncUninterruptibly();
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return promise;
}
@Override

View File

@ -614,12 +614,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
try {
doClose();
} catch (Throwable t2) {
logger.warn("Failed to close a channel", t2);
}
closeForcibly();
promise.setFailure(t);
closeFuture.setClosed();
}

View File

@ -313,10 +313,8 @@ public class LocalChannel extends AbstractChannel {
Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
if (!(boundChannel instanceof LocalServerChannel)) {
Exception cause =
new ChannelException("connection refused");
Exception cause = new ChannelException("connection refused");
promise.setFailure(cause);
pipeline().fireExceptionCaught(cause);
close(voidFuture());
return;
}

View File

@ -25,7 +25,6 @@ import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.util.concurrent.Future;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
@ -34,8 +33,7 @@ import java.util.List;
public class BootstrapTest {
@Test(timeout = 10000)
@Ignore
public void testInitializationDeadLock() throws Exception {
public void testBindDeadLock() throws Exception {
EventLoopGroup groupA = new LocalEventLoopGroup(1);
EventLoopGroup groupB = new LocalEventLoopGroup(1);
@ -80,6 +78,52 @@ public class BootstrapTest {
}
}
@Test(timeout = 10000)
public void testConnectDeadLock() throws Exception {
EventLoopGroup groupA = new LocalEventLoopGroup(1);
EventLoopGroup groupB = new LocalEventLoopGroup(1);
try {
ChannelInboundMessageHandler<Object> dummyHandler = new DummyHandler();
final Bootstrap bootstrapA = new Bootstrap();
bootstrapA.group(groupA);
bootstrapA.channel(LocalChannel.class);
bootstrapA.handler(dummyHandler);
final Bootstrap bootstrapB = new Bootstrap();
bootstrapB.group(groupB);
bootstrapB.channel(LocalChannel.class);
bootstrapB.handler(dummyHandler);
List<Future<?>> bindFutures = new ArrayList<Future<?>>();
// Try to connect from each other.
for (int i = 0; i < 1024; i ++) {
bindFutures.add(groupA.next().submit(new Runnable() {
@Override
public void run() {
bootstrapB.connect(LocalAddress.ANY);
}
}));
bindFutures.add(groupB.next().submit(new Runnable() {
@Override
public void run() {
bootstrapA.connect(LocalAddress.ANY);
}
}));
}
for (Future<?> f: bindFutures) {
f.sync();
}
} finally {
groupA.shutdown();
groupB.shutdown();
}
}
@Sharable
private static final class DummyHandler extends ChannelInboundMessageHandlerAdapter<Object> {
@Override