From d42ea03799b3df4fb87cd9476673e9132fe61106 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 12 Jun 2009 01:43:26 +0000 Subject: [PATCH] Fixed incorrect or missing event order in Bootstraps and NIO UDP transport --- .../netty/bootstrap/ClientBootstrap.java | 9 ++-- .../bootstrap/ConnectionlessBootstrap.java | 24 ++++++----- .../netty/bootstrap/ServerBootstrap.java | 34 ++++++++------- .../socket/nio/NioDatagramPipelineSink.java | 43 +++++++++++++++++-- .../channel/socket/nio/NioUdpWorker.java | 16 ++++++- 5 files changed, 93 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/jboss/netty/bootstrap/ClientBootstrap.java b/src/main/java/org/jboss/netty/bootstrap/ClientBootstrap.java index f94368d725..65da44ce6b 100644 --- a/src/main/java/org/jboss/netty/bootstrap/ClientBootstrap.java +++ b/src/main/java/org/jboss/netty/bootstrap/ClientBootstrap.java @@ -273,10 +273,13 @@ public class ClientBootstrap extends Bootstrap { public void channelOpen( ChannelHandlerContext context, ChannelStateEvent event) { - context.sendUpstream(event); - // Apply options. - event.getChannel().getConfig().setOptions(bootstrap.getOptions()); + try { + // Apply options. + event.getChannel().getConfig().setOptions(bootstrap.getOptions()); + } finally { + context.sendUpstream(event); + } // Bind or connect. if (localAddress != null) { diff --git a/src/main/java/org/jboss/netty/bootstrap/ConnectionlessBootstrap.java b/src/main/java/org/jboss/netty/bootstrap/ConnectionlessBootstrap.java index 5f5fc9eccd..7a2b70e33f 100644 --- a/src/main/java/org/jboss/netty/bootstrap/ConnectionlessBootstrap.java +++ b/src/main/java/org/jboss/netty/bootstrap/ConnectionlessBootstrap.java @@ -339,29 +339,33 @@ public class ConnectionlessBootstrap extends Bootstrap { @ChannelPipelineCoverage("one") private final class ConnectionlessBinder extends SimpleChannelUpstreamHandler { - + private final SocketAddress localAddress; private final BlockingQueue futureQueue; - + ConnectionlessBinder(SocketAddress localAddress, BlockingQueue futureQueue) { this.localAddress = localAddress; this.futureQueue = futureQueue; } - + @Override public void channelOpen( ChannelHandlerContext ctx, ChannelStateEvent evt) { - evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory()); - - // Apply options. - evt.getChannel().getConfig().setOptions(getOptions()); - + + try { + evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory()); + + // Apply options. + evt.getChannel().getConfig().setOptions(getOptions()); + } finally { + ctx.sendUpstream(evt); + } + boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); assert finished; - ctx.sendUpstream(evt); } - + @Override public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e) diff --git a/src/main/java/org/jboss/netty/bootstrap/ServerBootstrap.java b/src/main/java/org/jboss/netty/bootstrap/ServerBootstrap.java index ec19325d71..1f43f04807 100644 --- a/src/main/java/org/jboss/netty/bootstrap/ServerBootstrap.java +++ b/src/main/java/org/jboss/netty/bootstrap/ServerBootstrap.java @@ -311,27 +311,31 @@ public class ServerBootstrap extends Bootstrap { public void channelOpen( ChannelHandlerContext ctx, ChannelStateEvent evt) { - evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory()); - // Split options into two categories: parent and child. - Map allOptions = getOptions(); - Map parentOptions = new HashMap(); - for (Entry e: allOptions.entrySet()) { - if (e.getKey().startsWith("child.")) { - childOptions.put( - e.getKey().substring(6), - e.getValue()); - } else if (!e.getKey().equals("pipelineFactory")) { - parentOptions.put(e.getKey(), e.getValue()); + try { + evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory()); + + // Split options into two categories: parent and child. + Map allOptions = getOptions(); + Map parentOptions = new HashMap(); + for (Entry e: allOptions.entrySet()) { + if (e.getKey().startsWith("child.")) { + childOptions.put( + e.getKey().substring(6), + e.getValue()); + } else if (!e.getKey().equals("pipelineFactory")) { + parentOptions.put(e.getKey(), e.getValue()); + } } - } - // Apply parent options. - evt.getChannel().getConfig().setOptions(parentOptions); + // Apply parent options. + evt.getChannel().getConfig().setOptions(parentOptions); + } finally { + ctx.sendUpstream(evt); + } boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); assert finished; - ctx.sendUpstream(evt); } @Override diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java index d6483e0461..c72087417e 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -25,12 +25,14 @@ package org.jboss.netty.channel.socket.nio; import static org.jboss.netty.channel.Channels.*; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import org.jboss.netty.channel.AbstractChannelSink; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; @@ -100,11 +102,10 @@ class NioDatagramPipelineSink extends AbstractChannelSink { } break; case CONNECTED: - // TODO Implement me if (value != null) { - //connect(channel, future, (SocketAddress) value); + connect(channel, future, (InetSocketAddress) value); } else { - //NioUdpWorker.disconnect(channel, future); + NioUdpWorker.disconnect(channel, future); } break; case INTEREST_OPS: @@ -164,6 +165,42 @@ class NioDatagramPipelineSink extends AbstractChannelSink { } } + private void connect( + NioDatagramChannel channel, ChannelFuture future, + SocketAddress remoteAddress) { + + boolean bound = channel.isBound(); + boolean connected = false; + boolean workerStarted = false; + + future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + + try { + channel.getDatagramChannel().connect(remoteAddress); + connected = true; + + // Fire events. + future.setSuccess(); + if (!bound) { + fireChannelBound(channel, channel.getLocalAddress()); + } + fireChannelConnected(channel, channel.getRemoteAddress()); + + if (!bound) { + channel.worker.register(channel, future); + } + + workerStarted = true; + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } finally { + if (connected && !workerStarted) { + NioUdpWorker.close(channel, future); + } + } + } + NioUdpWorker nextWorker() { return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java index b2f445bb96..ed0315d7f6 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioUdpWorker.java @@ -618,6 +618,20 @@ class NioUdpWorker implements Runnable { } } + static void disconnect(NioDatagramChannel channel, ChannelFuture future) { + boolean connected = channel.isConnected(); + try { + channel.getDatagramChannel().disconnect(); + future.setSuccess(); + if (connected) { + fireChannelDisconnected(channel); + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + static void close(final NioDatagramChannel channel, final ChannelFuture future) { NioUdpWorker worker = channel.worker; @@ -820,8 +834,6 @@ class NioUdpWorker implements Runnable { throw new ChannelException( "Failed to register a socket to the selector.", e); } - // XXX: Perhaps channelBind? - fireChannelConnected(channel, localAddress); } } }