From f69cb45ff9098b6226451ff2701e2e0d2c416422 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 28 Apr 2009 05:28:36 +0000 Subject: [PATCH] Added ConnectionlessBootstrap for UDP transports --- .../bootstrap/ConnectionlessBootstrap.java | 263 ++++++++++++++++++ .../netty/bootstrap/ServerBootstrap.java | 7 +- .../example/qotm/QuoteOfTheMomentClient.java | 10 +- .../example/qotm/QuoteOfTheMomentServer.java | 11 +- 4 files changed, 277 insertions(+), 14 deletions(-) create mode 100644 src/main/java/org/jboss/netty/bootstrap/ConnectionlessBootstrap.java diff --git a/src/main/java/org/jboss/netty/bootstrap/ConnectionlessBootstrap.java b/src/main/java/org/jboss/netty/bootstrap/ConnectionlessBootstrap.java new file mode 100644 index 0000000000..b714f1bca5 --- /dev/null +++ b/src/main/java/org/jboss/netty/bootstrap/ConnectionlessBootstrap.java @@ -0,0 +1,263 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.bootstrap; + +import static org.jboss.netty.channel.Channels.*; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelConfig; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelHandler; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.channel.ChannelPipelineException; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + +/** + * A helper class which creates a new server-side {@link Channel} for a + * connectionless transport. + * + *

Only for connectionless transports

+ * + * Use {@link ServerBootstrap} instead for connection oriented transports. + * Do not use this helper if you are using a connection oriented transport such + * as TCP/IP and local transport which accepts an incoming connection and lets + * the accepted child channels handle received messages. + * + *

Configuring channels

+ * + * {@link #setOption(String, Object) Options} are used to configure a channel: + * + *
+ * ConnectionlessBootstrap b = ...;
+ *
+ * // Options for a new channel
+ * b.setOption("localAddress", new {@link InetSocketAddress}(8080));
+ * b.setOption("tcpNoDelay", true);
+ * b.setOption("receiveBufferSize", 1048576);
+ * 
+ * + * For the detailed list of available options, please refer to + * {@link ChannelConfig} and its sub-types + * + *

Configuring a channel pipeline

+ * + * Every channel has its own {@link ChannelPipeline} and you can configure it + * in two ways. + *

+ * {@linkplain #setPipeline(ChannelPipeline) The first approach} is to use + * the default pipeline and let the bootstrap to shallow-copy the default + * pipeline for each new channel: + * + *

+ * ConnectionlessBootstrap b = ...;
+ * {@link ChannelPipeline} p = b.getPipeline();
+ *
+ * // Add handlers to the pipeline.
+ * p.addLast("encoder", new EncodingHandler());
+ * p.addLast("decoder", new DecodingHandler());
+ * p.addLast("logic",   new LogicHandler());
+ * 
+ * + * Please note 'shallow-copy' here means that the added {@link ChannelHandler}s + * are not cloned but only their references are added to the new pipeline. + * Therefore, you have to choose the second approach if you are going to open + * more than one {@link Channel} whose {@link ChannelPipeline} contains any + * {@link ChannelHandler} whose {@link ChannelPipelineCoverage} is {@code "one"}. + * + *

+ * {@linkplain #setPipelineFactory(ChannelPipelineFactory) The second approach} + * is to specify a {@link ChannelPipelineFactory} by yourself and have full + * control over how a new pipeline is created. This approach is more complex: + * + *

+ * ConnectionlessBootstrap b = ...;
+ * b.setPipelineFactory(new MyPipelineFactory());
+ *
+ * public class MyPipelineFactory implements {@link ChannelPipelineFactory} {
+ *   // Create a new pipeline for a new channel and configure it here ...
+ * }
+ * 
+ * + *

Applying different settings for different {@link Channel}s

+ * + * {@link ConnectionlessBootstrap} is just a helper class. It neither + * allocates nor manages any resources. What manages the resources is the + * {@link ChannelFactory} implementation you specified in the constructor of + * {@link ConnectionlessBootstrap}. Therefore, it is OK to create as + * many {@link ConnectionlessBootstrap} instances as you want to apply + * different settings for different {@link Channel}s. + * + * TODO: Show how to shut down a service. + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + * + * @apiviz.landmark + */ +public class ConnectionlessBootstrap extends Bootstrap { + + /** + * Creates a new instance with no {@link ChannelFactory} set. + * {@link #setFactory(ChannelFactory)} must be called before any I/O + * operation is requested. + */ + public ConnectionlessBootstrap() { + super(); + } + + /** + * Creates a new instance with the specified initial {@link ChannelFactory}. + */ + public ConnectionlessBootstrap(ChannelFactory channelFactory) { + super(channelFactory); + } + + /** + * Creates a new channel which is bound to the local address which was + * specified in the current {@code "localAddress"} option. This method is + * similar to the following code: + * + *
+     * ServerBootstrap b = ...;
+     * b.connect(b.getOption("localAddress"));
+     * 
+ * + * @return a new bound channel which accepts incoming connections + * + * @throws IllegalStateException + * if {@code "localAddress"} option was not set + * @throws ClassCastException + * if {@code "localAddress"} option's value is + * neither a {@link SocketAddress} nor {@code null} + * @throws ChannelException + * if failed to create a new channel and + * bind it to the local address + */ + public Channel bind() { + SocketAddress localAddress = (SocketAddress) getOption("localAddress"); + if (localAddress == null) { + throw new IllegalStateException("localAddress option is not set."); + } + return bind(localAddress); + } + + /** + * Creates a new channel which is bound to the specified local address. + * + * @return a new bound channel which accepts incoming connections + * + * @throws ChannelException + * if failed to create a new channel and + * bind it to the local address + */ + public Channel bind(final SocketAddress localAddress) { + if (localAddress == null) { + throw new NullPointerException("localAddress"); + } + + final BlockingQueue futureQueue = + new LinkedBlockingQueue(); + + ChannelPipeline pipeline; + try { + pipeline = getPipelineFactory().getPipeline(); + } catch (Exception e) { + throw new ChannelPipelineException("Failed to initialize a pipeline.", e); + } + + pipeline.addFirst("binder", new Binder(localAddress, futureQueue)); + + Channel channel = getFactory().newChannel(pipeline); + + // Wait until the future is available. + ChannelFuture future = null; + do { + try { + future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore + } + } while (future == null); + + pipeline.remove("binder"); + + // Wait for the future. + future.awaitUninterruptibly(); + if (!future.isSuccess()) { + future.getChannel().close().awaitUninterruptibly(); + throw new ChannelException("Failed to bind to: " + localAddress, future.getCause()); + } + + return channel; + } + + @ChannelPipelineCoverage("one") + private final class Binder extends SimpleChannelUpstreamHandler { + + private final SocketAddress localAddress; + private final BlockingQueue futureQueue; + + Binder(SocketAddress localAddress, BlockingQueue futureQueue) { + this.localAddress = localAddress; + this.futureQueue = futureQueue; + } + + @Override + public void channelOpen( + ChannelHandlerContext ctx, + ChannelStateEvent evt) { + evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory()); + + // Apply options. + evt.getChannel().getConfig().setOptions(getOptions()); + + boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); + assert finished; + ctx.sendUpstream(evt); + } + + @Override + public void exceptionCaught( + ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + boolean finished = futureQueue.offer(failedFuture(e.getChannel(), e.getCause())); + assert finished; + ctx.sendUpstream(e); + } + } +} diff --git a/src/main/java/org/jboss/netty/bootstrap/ServerBootstrap.java b/src/main/java/org/jboss/netty/bootstrap/ServerBootstrap.java index 2efa08dde0..ec19325d71 100644 --- a/src/main/java/org/jboss/netty/bootstrap/ServerBootstrap.java +++ b/src/main/java/org/jboss/netty/bootstrap/ServerBootstrap.java @@ -55,9 +55,10 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler; * *

Only for connection oriented transports

* - * Do not use this helper if you are using a connectionless transport such as - * UDP/IP which does not accept an incoming connection but receives messages by - * itself without creating a child channel. + * Use {@link ConnectionlessBootstrap} instead for connectionless + * transports. Do not use this helper if you are using a connectionless + * transport such as UDP/IP which does not accept an incoming connection but + * receives messages by itself without creating a child channel. * *

Parent channel and its children

* diff --git a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java index 3731cde219..f0790f4c3e 100644 --- a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java +++ b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentClient.java @@ -25,8 +25,8 @@ package org.jboss.netty.example.qotm; import java.net.InetSocketAddress; import java.util.concurrent.Executors; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.DatagramChannel; import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; @@ -49,14 +49,14 @@ public class QuoteOfTheMomentClient { DatagramChannelFactory f = new OioDatagramChannelFactory(Executors.newCachedThreadPool()); - ChannelPipeline p = Channels.pipeline(); + ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); + ChannelPipeline p = b.getPipeline(); p.addLast("encoder", new StringEncoder("UTF-8")); p.addLast("decoder", new StringDecoder("UTF-8")); p.addLast("handler", new QuoteOfTheMomentClientHandler()); - DatagramChannel c = f.newChannel(p); - c.getConfig().setBroadcast(true); - c.bind(new InetSocketAddress(0)).awaitUninterruptibly(); + b.setOption("broadcast", "true"); + DatagramChannel c = (DatagramChannel) b.bind(new InetSocketAddress(0)); // Broadcast the QOTM request to port 8080. c.write("QOTM?", new InetSocketAddress("255.255.255.255", 8080)); diff --git a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java index d37a20afcf..bfe2c76209 100644 --- a/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java +++ b/src/main/java/org/jboss/netty/example/qotm/QuoteOfTheMomentServer.java @@ -25,9 +25,8 @@ package org.jboss.netty.example.qotm; import java.net.InetSocketAddress; import java.util.concurrent.Executors; +import org.jboss.netty.bootstrap.ConnectionlessBootstrap; import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.socket.DatagramChannel; import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; @@ -49,13 +48,13 @@ public class QuoteOfTheMomentServer { DatagramChannelFactory f = new OioDatagramChannelFactory(Executors.newCachedThreadPool()); - ChannelPipeline p = Channels.pipeline(); + ConnectionlessBootstrap b = new ConnectionlessBootstrap(f); + ChannelPipeline p = b.getPipeline(); p.addLast("encoder", new StringEncoder("UTF-8")); p.addLast("decoder", new StringDecoder("UTF-8")); p.addLast("handler", new QuoteOfTheMomentServerHandler()); - DatagramChannel c = f.newChannel(p); - c.getConfig().setBroadcast(false); - c.bind(new InetSocketAddress(8080)).awaitUninterruptibly(); + b.setOption("broadcast", "false"); + b.bind(new InetSocketAddress(8080)); } }