From 4ce85827eda444b7703717c7948f7b247227e655 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 11 Sep 2012 08:31:20 +0200 Subject: [PATCH] Start to refactor bootstraps to share more code and allow for reuse --- .../java/io/netty/bootstrap/Bootstrap.java | 280 ++++++++---------- .../io/netty/bootstrap/ClientBootstrap.java | 133 +++++++++ .../io/netty/bootstrap/ServerBootstrap.java | 145 +++------ .../java/io/netty/channel/ChannelHandler.java | 4 +- .../local/LocalChannelRegistryTest.java | 8 +- .../local/LocalTransportThreadModelTest.java | 2 +- 6 files changed, 294 insertions(+), 278 deletions(-) create mode 100644 transport/src/main/java/io/netty/bootstrap/ClientBootstrap.java diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index 501ec522f2..a4cea1d3b2 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -13,39 +13,32 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; +package io.netty.bootstrap; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.channels.ClosedChannelException; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Map.Entry; -public class Bootstrap { +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelException; - private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class); - private final Map, Object> options = new LinkedHashMap, Object>(); +public abstract class Bootstrap> { private EventLoopGroup group; - private Channel channel; - private ChannelHandler handler; + private ChannelFactory factory; private SocketAddress localAddress; - private SocketAddress remoteAddress; + private final Map, Object> options = new LinkedHashMap, Object>(); + private ChannelHandler handler; - public Bootstrap group(EventLoopGroup group) { + @SuppressWarnings("unchecked") + public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } @@ -53,21 +46,54 @@ public class Bootstrap { throw new IllegalStateException("group set already"); } this.group = group; - return this; + return (B) this; } - public Bootstrap channel(Channel channel) { - if (channel == null) { - throw new NullPointerException("channel"); + public B channel(Class channelClass) { + if (channelClass == null) { + throw new NullPointerException("channelClass"); } - if (this.channel != null) { - throw new IllegalStateException("channel set already"); - } - this.channel = channel; - return this; + return channelFactory(new BootstrapChannelFactory(channelClass)); } - public Bootstrap option(ChannelOption option, T value) { + @SuppressWarnings("unchecked") + public B channelFactory(ChannelFactory factory) { + if (factory == null) { + throw new NullPointerException("factory"); + } + if (this.factory != null) { + throw new IllegalStateException("factory set already"); + } + this.factory = factory; + return (B) this; + } + + @SuppressWarnings("unchecked") + public B localAddress(SocketAddress localAddress) { + this.localAddress = localAddress; + return (B) this; + } + + @SuppressWarnings("unchecked") + public B localAddress(int port) { + localAddress = new InetSocketAddress(port); + return (B) this; + } + + @SuppressWarnings("unchecked") + public B localAddress(String host, int port) { + localAddress = new InetSocketAddress(host, port); + return (B) this; + } + + @SuppressWarnings("unchecked") + public B localAddress(InetAddress host, int port) { + localAddress = new InetSocketAddress(host, port); + return (B) this; + } + + @SuppressWarnings("unchecked") + public B option(ChannelOption option, T value) { if (option == null) { throw new NullPointerException("option"); } @@ -76,135 +102,50 @@ public class Bootstrap { } else { options.put(option, value); } - return this; + return (B) this; } - public Bootstrap handler(ChannelHandler handler) { - if (handler == null) { - throw new NullPointerException("handler"); + public void shutdown() { + if (group != null) { + group.shutdown(); } - this.handler = handler; - return this; } - public Bootstrap localAddress(SocketAddress localAddress) { - this.localAddress = localAddress; - return this; + protected void validate() { + if (group == null) { + throw new IllegalStateException("group not set"); + } + if (factory == null) { + throw new IllegalStateException("factory not set"); + } + if (handler == null) { + throw new IllegalStateException("handler not set"); + } } - public Bootstrap localAddress(int port) { - localAddress = new InetSocketAddress(port); - return this; - } - - public Bootstrap localAddress(String host, int port) { - localAddress = new InetSocketAddress(host, port); - return this; - } - - public Bootstrap localAddress(InetAddress host, int port) { - localAddress = new InetSocketAddress(host, port); - return this; - } - - public Bootstrap remoteAddress(SocketAddress remoteAddress) { - this.remoteAddress = remoteAddress; - return this; - } - - public Bootstrap remoteAddress(String host, int port) { - remoteAddress = new InetSocketAddress(host, port); - return this; - } - - public Bootstrap remoteAddress(InetAddress host, int port) { - remoteAddress = new InetSocketAddress(host, port); - return this; + protected final void validate(ChannelFuture future) { + if (future == null) { + throw new NullPointerException("future"); + } + validate(); } public ChannelFuture bind() { validate(); + Channel channel = factory().newChannel(); return bind(channel.newFuture()); } - public ChannelFuture bind(ChannelFuture future) { - validate(future); - if (localAddress == null) { - throw new IllegalStateException("localAddress not set"); + @SuppressWarnings("unchecked") + public B handler(ChannelHandler handler) { + if (handler == null) { + throw new NullPointerException("handler"); } - - try { - init(); - } catch (Throwable t) { - future.setFailure(t); - return future; - } - - if (!ensureOpen(future)) { - return future; - } - - return channel.bind(localAddress, future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + this.handler = handler; + return (B) this; } - public ChannelFuture connect() { - validate(); - return connect(channel.newFuture()); - } - - public ChannelFuture connect(ChannelFuture future) { - validate(future); - if (remoteAddress == null) { - throw new IllegalStateException("remoteAddress not set"); - } - - try { - init(); - } catch (Throwable t) { - future.setFailure(t); - return future; - } - - if (!ensureOpen(future)) { - return future; - } - - if (localAddress == null) { - channel.connect(remoteAddress, future); - } else { - channel.connect(remoteAddress, localAddress, future); - } - return future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - } - - private void init() throws Exception { - if (channel.isActive()) { - throw new IllegalStateException("channel already active:: " + channel); - } - if (channel.isRegistered()) { - throw new IllegalStateException("channel already registered: " + channel); - } - if (!channel.isOpen()) { - throw new ClosedChannelException(); - } - - ChannelPipeline p = channel.pipeline(); - p.addLast(handler); - - for (Entry, Object> e: options.entrySet()) { - try { - if (!channel.config().setOption((ChannelOption) e.getKey(), e.getValue())) { - logger.warn("Unknown channel option: " + e); - } - } catch (Throwable t) { - logger.warn("Failed to set a channel option: " + channel, t); - } - } - - group.register(channel).syncUninterruptibly(); - } - - private static boolean ensureOpen(ChannelFuture future) { + protected static boolean ensureOpen(ChannelFuture future) { if (!future.channel().isOpen()) { // Registration was successful but the channel was closed due to some failure in // handler. @@ -214,32 +155,47 @@ public class Bootstrap { return true; } - public void shutdown() { - if (group != null) { - group.shutdown(); - } + public abstract ChannelFuture bind(ChannelFuture future); + + protected final SocketAddress localAddress() { + return localAddress; } - private void validate() { - if (group == null) { - throw new IllegalStateException("group not set"); - } - if (channel == null) { - throw new IllegalStateException("channel not set"); - } - if (handler == null) { - throw new IllegalStateException("handler not set"); - } + protected final ChannelFactory factory() { + return factory; } - private void validate(ChannelFuture future) { - if (future == null) { - throw new NullPointerException("future"); + protected final ChannelHandler handler() { + return handler; + } + + protected final EventLoopGroup group() { + return group; + } + + protected final Map, Object> options() { + return options; + } + + private final class BootstrapChannelFactory implements ChannelFactory { + private final Class clazz; + + BootstrapChannelFactory(Class clazz) { + this.clazz = clazz; } - if (future.channel() != channel) { - throw new IllegalArgumentException("future.channel() must be the same channel."); + @Override + public Channel newChannel() { + try { + return clazz.newInstance(); + } catch (Throwable t) { + throw new ChannelException("Unable to create Channel from class " + clazz, t); + } } - validate(); + + } + + public interface ChannelFactory { + Channel newChannel(); } } diff --git a/transport/src/main/java/io/netty/bootstrap/ClientBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ClientBootstrap.java new file mode 100644 index 0000000000..2853860940 --- /dev/null +++ b/transport/src/main/java/io/netty/bootstrap/ClientBootstrap.java @@ -0,0 +1,133 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.bootstrap; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.Map.Entry; + +public class ClientBootstrap extends Bootstrap { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(ClientBootstrap.class); + private SocketAddress remoteAddress; + + + public ClientBootstrap remoteAddress(SocketAddress remoteAddress) { + this.remoteAddress = remoteAddress; + return this; + } + + public ClientBootstrap remoteAddress(String host, int port) { + remoteAddress = new InetSocketAddress(host, port); + return this; + } + + public ClientBootstrap remoteAddress(InetAddress host, int port) { + remoteAddress = new InetSocketAddress(host, port); + return this; + } + + @Override + public ChannelFuture bind(ChannelFuture future) { + validate(future); + if (localAddress() == null) { + throw new IllegalStateException("localAddress not set"); + } + + try { + init(future.channel()); + } catch (Throwable t) { + future.setFailure(t); + return future; + } + + if (!ensureOpen(future)) { + return future; + } + + return future.channel().bind(localAddress(), future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } + + public ChannelFuture connect() { + validate(); + Channel channel = factory().newChannel(); + return connect(channel.newFuture()); + } + + public ChannelFuture connect(ChannelFuture future) { + validate(future); + if (remoteAddress == null) { + throw new IllegalStateException("remoteAddress not set"); + } + + try { + init(future.channel()); + } catch (Throwable t) { + future.setFailure(t); + return future; + } + + if (!ensureOpen(future)) { + return future; + } + + if (localAddress() == null) { + future.channel().connect(remoteAddress, future); + } else { + future.channel().connect(remoteAddress, localAddress(), future); + } + return future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } + + @SuppressWarnings("unchecked") + private void init(Channel channel) throws Exception { + if (channel.isActive()) { + throw new IllegalStateException("channel already active:: " + channel); + } + if (channel.isRegistered()) { + throw new IllegalStateException("channel already registered: " + channel); + } + if (!channel.isOpen()) { + throw new ClosedChannelException(); + } + + ChannelPipeline p = channel.pipeline(); + p.addLast(handler()); + + for (Entry, Object> e: options().entrySet()) { + try { + if (!channel.config().setOption((ChannelOption) e.getKey(), e.getValue())) { + logger.warn("Unknown channel option: " + e); + } + } catch (Throwable t) { + logger.warn("Failed to set a channel option: " + channel, t); + } + } + + group().register(channel).syncUninterruptibly(); + } + +} diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index 3b86b808c8..0a7fe6a358 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -17,32 +17,30 @@ package io.netty.bootstrap; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; + import io.netty.channel.Channel; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInboundMessageHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.NetworkConstants; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; -public class ServerBootstrap { +public class ServerBootstrap extends Bootstrap { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class); private static final InetSocketAddress DEFAULT_LOCAL_ADDR = new InetSocketAddress(NetworkConstants.LOCALHOST, 0); @@ -54,60 +52,37 @@ public class ServerBootstrap { } }; - private final Map, Object> parentOptions = new LinkedHashMap, Object>(); private final Map, Object> childOptions = new LinkedHashMap, Object>(); - private EventLoopGroup parentGroup; private EventLoopGroup childGroup; - private ServerChannel channel; private ChannelHandler handler; private ChannelHandler childHandler; - private SocketAddress localAddress; + @Override public ServerBootstrap group(EventLoopGroup group) { - if (group == null) { - throw new NullPointerException("group"); - } - if (parentGroup != null) { - throw new IllegalStateException("parentGroup set already"); - } - parentGroup = group; - childGroup = group; - return this; + return group(group, group); } public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { - if (parentGroup == null) { - throw new NullPointerException("parentGroup"); + super.group(parentGroup); + if (childGroup == null) { + throw new NullPointerException("childGroup"); } - if (this.parentGroup != null) { - throw new IllegalStateException("parentGroup set already"); + if (this.childGroup != null) { + throw new IllegalStateException("childGroup set already"); } - this.parentGroup = parentGroup; this.childGroup = childGroup; return this; } - public ServerBootstrap channel(ServerChannel channel) { - if (channel == null) { - throw new NullPointerException("channel"); + @Override + public ServerBootstrap channel(Class channelClass) { + if (channelClass == null) { + throw new NullPointerException("channelClass"); } - if (this.channel != null) { - throw new IllegalStateException("channel set already"); + if (!ServerChannel.class.isAssignableFrom(channelClass)) { + throw new IllegalArgumentException(); } - this.channel = channel; - return this; - } - - public ServerBootstrap option(ChannelOption parentOption, T value) { - if (parentOption == null) { - throw new NullPointerException("parentOption"); - } - if (value == null) { - parentOptions.remove(parentOption); - } else { - parentOptions.put(parentOption, value); - } - return this; + return super.channel(channelClass); } public ServerBootstrap childOption(ChannelOption childOption, T value) { @@ -122,11 +97,6 @@ public class ServerBootstrap { return this; } - public ServerBootstrap handler(ChannelHandler handler) { - this.handler = handler; - return this; - } - public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); @@ -135,36 +105,10 @@ public class ServerBootstrap { return this; } - public ServerBootstrap localAddress(SocketAddress localAddress) { - if (localAddress == null) { - throw new NullPointerException("localAddress"); - } - this.localAddress = localAddress; - return this; - } - - public ServerBootstrap localAddress(int port) { - localAddress = new InetSocketAddress(port); - return this; - } - - public ServerBootstrap localAddress(String host, int port) { - localAddress = new InetSocketAddress(host, port); - return this; - } - - public ServerBootstrap localAddress(InetAddress host, int port) { - localAddress = new InetSocketAddress(host, port); - return this; - } - - public ChannelFuture bind() { - validate(); - return bind(channel.newFuture()); - } - + @Override public ChannelFuture bind(ChannelFuture future) { validate(future); + Channel channel = future.channel(); if (channel.isActive()) { future.setFailure(new IllegalStateException("channel already bound: " + channel)); return future; @@ -179,75 +123,57 @@ public class ServerBootstrap { } try { - channel.config().setOptions(parentOptions); + channel.config().setOptions(options()); } catch (Exception e) { future.setFailure(e); return future; } - ChannelPipeline p = channel.pipeline(); + ChannelPipeline p = future.channel().pipeline(); if (handler != null) { p.addLast(handler); } p.addLast(acceptor); - ChannelFuture f = parentGroup.register(channel).awaitUninterruptibly(); + ChannelFuture f = group().register(channel).awaitUninterruptibly(); if (!f.isSuccess()) { future.setFailure(f.cause()); return future; } - if (!channel.isOpen()) { - // Registration was successful but the channel was closed due to some failure in - // handler. - future.setFailure(new ChannelException("initialization failure")); + if (!ensureOpen(future)) { return future; } - channel.bind(localAddress, future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + channel.bind(localAddress(), future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); return future; } + @Override public void shutdown() { - if (parentGroup != null) { - parentGroup.shutdown(); - } + super.shutdown(); if (childGroup != null) { childGroup.shutdown(); } } - private void validate() { - if (parentGroup == null) { - throw new IllegalStateException("parentGroup not set"); - } - if (channel == null) { - throw new IllegalStateException("channel not set"); - } + @Override + protected void validate() { + super.validate(); if (childHandler == null) { throw new IllegalStateException("childHandler not set"); } if (childGroup == null) { logger.warn("childGroup is not set. Using parentGroup instead."); - childGroup = parentGroup; + childGroup = group(); } - if (localAddress == null) { + if (localAddress() == null) { logger.warn("localAddress is not set. Using " + DEFAULT_LOCAL_ADDR + " instead."); - localAddress = DEFAULT_LOCAL_ADDR; + localAddress(DEFAULT_LOCAL_ADDR); } } - private void validate(ChannelFuture future) { - if (future == null) { - throw new NullPointerException("future"); - } - - if (future.channel() != channel) { - throw new IllegalArgumentException("future.channel() must be the same channel."); - } - validate(); - } private class Acceptor extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler { @@ -257,6 +183,7 @@ public class ServerBootstrap { return Unpooled.messageBuffer(); } + @SuppressWarnings("unchecked") @Override public void inboundBufferUpdated(ChannelHandlerContext ctx) { MessageBuf in = ctx.inboundMessageBuffer(); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandler.java b/transport/src/main/java/io/netty/channel/ChannelHandler.java index 358c413d90..2b93ff0e52 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandler.java @@ -15,7 +15,7 @@ */ package io.netty.channel; -import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ClientBootstrap; import io.netty.channel.group.ChannelGroup; import java.lang.annotation.Documented; @@ -87,7 +87,7 @@ import java.nio.channels.Channels; * the confidential information: *
  * // Create a new handler instance per channel.
- * // See {@link Bootstrap#setPipelineFactory(ChannelPipelineFactory)}.
+ * // See {@link ClientBootstrap#setPipelineFactory(ChannelPipelineFactory)}.
  * public class DataServerPipelineFactory implements {@link ChannelPipelineFactory} {
  *     public {@link ChannelPipeline} getPipeline() {
  *         return {@link Channels}.pipeline(new DataServerHandler());
diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java
index b7eef46d2d..d0871733db 100644
--- a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java
+++ b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java
@@ -15,7 +15,7 @@
  */
 package io.netty.channel.local;
 
-import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ClientBootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
@@ -39,16 +39,16 @@ public class LocalChannelRegistryTest {
 
         for (int i = 0; i < 2; i ++) {
             LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
-            Bootstrap cb = new Bootstrap();
+            ClientBootstrap cb = new ClientBootstrap();
             ServerBootstrap sb = new ServerBootstrap();
 
             cb.group(new LocalEventLoopGroup())
-              .channel(new LocalChannel())
+              .channel(LocalChannel.class)
               .remoteAddress(addr)
               .handler(new TestHandler());
 
             sb.group(new LocalEventLoopGroup())
-              .channel(new LocalServerChannel())
+              .channel(LocalServerChannel.class)
               .localAddress(addr)
               .childHandler(new ChannelInitializer() {
                   @Override
diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java
index 6e5c339ccc..84ebb499c9 100644
--- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java
+++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java
@@ -58,7 +58,7 @@ public class LocalTransportThreadModelTest {
         // Configure a test server
         sb = new ServerBootstrap();
         sb.group(new LocalEventLoopGroup())
-          .channel(new LocalServerChannel())
+          .channel(LocalServerChannel.class)
           .localAddress(LocalAddress.ANY)
           .childHandler(new ChannelInitializer() {
               @Override