diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index 99eeb961dd..e2dd02e10d 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -19,8 +19,10 @@ package io.netty.bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.util.AttributeKey; @@ -265,7 +267,75 @@ abstract class AbstractBootstrap, C extends Ch return doBind(localAddress); } - abstract ChannelFuture doBind(SocketAddress localAddress); + private ChannelFuture doBind(final SocketAddress localAddress) { + final ChannelFuture regPromise = initAndRegister(); + final Channel channel = regPromise.channel(); + final ChannelPromise promise = channel.newPromise(); + if (regPromise.isDone()) { + doBind0(regPromise, channel, localAddress, promise); + } else { + regPromise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + doBind0(future, channel, localAddress, promise); + } + }); + } + + return promise; + } + + final ChannelFuture initAndRegister() { + final Channel channel = channelFactory().newChannel(); + try { + init(channel); + } catch (Throwable t) { + channel.unsafe().closeForcibly(); + return channel.newFailedFuture(t); + } + + ChannelPromise regPromise = channel.newPromise(); + group().register(channel, regPromise); + if (regPromise.cause() != null) { + if (channel.isRegistered()) { + channel.close(); + } else { + channel.unsafe().closeForcibly(); + } + } + + // If we are here and the promise is not failed, it's one of the following cases: + // 1) If we attempted registration from the event loop, the registration has been completed at this point. + // i.e. It's safe to attempt bind() or connect() now beause the channel has been registered. + // 2) If we attempted registration from the other thread, the registration request has been successfully + // added to the event loop's task queue for later execution. + // i.e. It's safe to attempt bind() or connect() now: + // because bind() or connect() will be executed *after* the scheduled registration task is executed + // because register(), bind(), and connect() are all bound to the same thread. + + return regPromise; + } + + abstract void init(Channel channel) throws Exception; + + private static void doBind0( + final ChannelFuture regFuture, final Channel channel, + final SocketAddress localAddress, final ChannelPromise promise) { + + // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up + // the pipeline in its channelRegistered() implementation. + + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + if (regFuture.isSuccess()) { + channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } else { + promise.setFailure(regFuture.cause()); + } + } + }); + } /** * the {@link ChannelHandler} to use for serving the requests. diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index 069ff44baa..687d717fc9 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -76,39 +76,6 @@ public final class Bootstrap extends AbstractBootstrap { return this; } - @Override - ChannelFuture doBind(final SocketAddress localAddress) { - final Channel channel = channelFactory().newChannel(); - ChannelPromise initPromise = init(channel); - if (initPromise.cause() != null) { - return initPromise; - } - - final ChannelPromise promise = channel.newPromise(); - if (initPromise.isDone()) { - doBind0(initPromise, channel, localAddress, promise); - } else { - initPromise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - doBind0(future, channel, localAddress, promise); - } - }); - } - - return promise; - } - - private static void doBind0( - ChannelFuture initFuture, Channel channel, SocketAddress localAddress, ChannelPromise promise) { - - if (initFuture.isSuccess()) { - channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - } else { - promise.setFailure(initFuture.cause()); - } - } - /** * Connect a {@link Channel} to the remote peer. */ @@ -163,20 +130,20 @@ public final class Bootstrap extends AbstractBootstrap { * @see {@link #connect()} */ private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { - final Channel channel = channelFactory().newChannel(); - ChannelPromise initPromise = init(channel); - if (initPromise.cause() != null) { - return initPromise; + final ChannelFuture regFuture = initAndRegister(); + final Channel channel = regFuture.channel(); + if (regFuture.cause() != null) { + return regFuture; } final ChannelPromise promise = channel.newPromise(); - if (initPromise.isDone()) { - doConnect0(initPromise, channel, remoteAddress, localAddress, promise); + if (regFuture.isDone()) { + doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } else { - initPromise.addListener(new ChannelFutureListener() { + regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - doConnect0(future, channel, remoteAddress, localAddress, promise); + doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } }); } @@ -185,71 +152,53 @@ public final class Bootstrap extends AbstractBootstrap { } private static void doConnect0( - ChannelFuture initFuture, Channel channel, - SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + final ChannelFuture regFuture, final Channel channel, + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { - if (initFuture.isSuccess()) { - if (localAddress == null) { - channel.connect(remoteAddress, promise); - } else { - channel.connect(remoteAddress, localAddress, promise); + // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up + // the pipeline in its channelRegistered() implementation. + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + if (regFuture.isSuccess()) { + if (localAddress == null) { + channel.connect(remoteAddress, promise); + } else { + channel.connect(remoteAddress, localAddress, promise); + } + promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } else { + promise.setFailure(regFuture.cause()); + } } - promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - } else { - promise.setFailure(initFuture.cause()); - } + }); } + @Override @SuppressWarnings("unchecked") - private ChannelPromise init(Channel channel) { - ChannelPromise promise = channel.newPromise(); - try { - ChannelPipeline p = channel.pipeline(); - p.addLast(handler()); + void init(Channel channel) throws Exception { + ChannelPipeline p = channel.pipeline(); + p.addLast(handler()); - final Map, Object> options = options(); - synchronized (options) { - for (Entry, Object> e: options.entrySet()) { - try { - if (!channel.config().setOption((ChannelOption) e.getKey(), e.getValue())) { - logger.warn("Unknown channel option: " + e); - } - } catch (Throwable t) { - logger.warn("Failed to set a channel option: " + channel, t); + final Map, Object> options = options(); + synchronized (options) { + for (Entry, Object> e: options.entrySet()) { + try { + if (!channel.config().setOption((ChannelOption) e.getKey(), e.getValue())) { + logger.warn("Unknown channel option: " + e); } + } catch (Throwable t) { + logger.warn("Failed to set a channel option: " + channel, t); } } - - final Map, Object> attrs = attrs(); - synchronized (attrs) { - for (Entry, Object> e: attrs.entrySet()) { - channel.attr((AttributeKey) e.getKey()).set(e.getValue()); - } - } - - group().register(channel, promise); - } catch (Throwable t) { - promise.setFailure(t); - } - - if (promise.cause() != null) { - if (channel.isRegistered()) { - channel.close(); - } else { - channel.unsafe().closeForcibly(); - } } - // If we are here and the promise is not failed, it's one of the following cases: - // 1) If we attempted registration from the event loop, the registration has been completed at this point. - // i.e. It's safe to attempt bind() or connect() now beause the channel has been registered. - // 2) If we attempted registration from the other thread, the registration request has been successfully - // added to the event loop's task queue for later execution. - // i.e. It's safe to attempt bind() or connect() now: - // because bind() or connect() will be executed *after* the scheduled registration task is executed - // because register(), bind(), and connect() are all bound to the same thread. - - return promise; + final Map, Object> attrs = attrs(); + synchronized (attrs) { + for (Entry, Object> e: attrs.entrySet()) { + channel.attr((AttributeKey) e.getKey()).set(e.getValue()); + } + } } @Override diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index d56ed61a40..5ea7c12871 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -18,8 +18,6 @@ package io.netty.bootstrap; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandler; @@ -34,7 +32,6 @@ import io.netty.util.AttributeKey; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.net.SocketAddress; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; @@ -140,17 +137,10 @@ public final class ServerBootstrap extends AbstractBootstrap, Object> options = options(); - synchronized (options) { - channel.config().setOptions(options); - } - } catch (Exception e) { - channel.close(); - return channel.newFailedFuture(e); + void init(Channel channel) throws Exception { + final Map, Object> options = options(); + synchronized (options) { + channel.config().setOptions(options); } final Map, Object> attrs = attrs(); @@ -185,13 +175,6 @@ public final class ServerBootstrap extends AbstractBootstrap