Fixed incorrect or missing event order in Bootstraps and NIO UDP transport

This commit is contained in:
Trustin Lee 2009-06-12 01:43:26 +00:00
parent 0a5019385c
commit d42ea03799
5 changed files with 93 additions and 33 deletions

View File

@ -273,10 +273,13 @@ public class ClientBootstrap extends Bootstrap {
public void channelOpen( public void channelOpen(
ChannelHandlerContext context, ChannelHandlerContext context,
ChannelStateEvent event) { ChannelStateEvent event) {
context.sendUpstream(event);
// Apply options. try {
event.getChannel().getConfig().setOptions(bootstrap.getOptions()); // Apply options.
event.getChannel().getConfig().setOptions(bootstrap.getOptions());
} finally {
context.sendUpstream(event);
}
// Bind or connect. // Bind or connect.
if (localAddress != null) { if (localAddress != null) {

View File

@ -352,14 +352,18 @@ public class ConnectionlessBootstrap extends Bootstrap {
public void channelOpen( public void channelOpen(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
ChannelStateEvent evt) { ChannelStateEvent evt) {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Apply options. try {
evt.getChannel().getConfig().setOptions(getOptions()); evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Apply options.
evt.getChannel().getConfig().setOptions(getOptions());
} finally {
ctx.sendUpstream(evt);
}
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
assert finished; assert finished;
ctx.sendUpstream(evt);
} }
@Override @Override

View File

@ -311,27 +311,31 @@ public class ServerBootstrap extends Bootstrap {
public void channelOpen( public void channelOpen(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
ChannelStateEvent evt) { ChannelStateEvent evt) {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Split options into two categories: parent and child. try {
Map<String, Object> allOptions = getOptions(); evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
Map<String, Object> parentOptions = new HashMap<String, Object>();
for (Entry<String, Object> e: allOptions.entrySet()) { // Split options into two categories: parent and child.
if (e.getKey().startsWith("child.")) { Map<String, Object> allOptions = getOptions();
childOptions.put( Map<String, Object> parentOptions = new HashMap<String, Object>();
e.getKey().substring(6), for (Entry<String, Object> e: allOptions.entrySet()) {
e.getValue()); if (e.getKey().startsWith("child.")) {
} else if (!e.getKey().equals("pipelineFactory")) { childOptions.put(
parentOptions.put(e.getKey(), e.getValue()); e.getKey().substring(6),
e.getValue());
} else if (!e.getKey().equals("pipelineFactory")) {
parentOptions.put(e.getKey(), e.getValue());
}
} }
}
// Apply parent options. // Apply parent options.
evt.getChannel().getConfig().setOptions(parentOptions); evt.getChannel().getConfig().setOptions(parentOptions);
} finally {
ctx.sendUpstream(evt);
}
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
assert finished; assert finished;
ctx.sendUpstream(evt);
} }
@Override @Override

View File

@ -25,12 +25,14 @@ package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*; import static org.jboss.netty.channel.Channels.*;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.AbstractChannelSink; import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
@ -100,11 +102,10 @@ class NioDatagramPipelineSink extends AbstractChannelSink {
} }
break; break;
case CONNECTED: case CONNECTED:
// TODO Implement me
if (value != null) { if (value != null) {
//connect(channel, future, (SocketAddress) value); connect(channel, future, (InetSocketAddress) value);
} else { } else {
//NioUdpWorker.disconnect(channel, future); NioUdpWorker.disconnect(channel, future);
} }
break; break;
case INTEREST_OPS: case INTEREST_OPS:
@ -164,6 +165,42 @@ class NioDatagramPipelineSink extends AbstractChannelSink {
} }
} }
private void connect(
NioDatagramChannel channel, ChannelFuture future,
SocketAddress remoteAddress) {
boolean bound = channel.isBound();
boolean connected = false;
boolean workerStarted = false;
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
try {
channel.getDatagramChannel().connect(remoteAddress);
connected = true;
// Fire events.
future.setSuccess();
if (!bound) {
fireChannelBound(channel, channel.getLocalAddress());
}
fireChannelConnected(channel, channel.getRemoteAddress());
if (!bound) {
channel.worker.register(channel, future);
}
workerStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (connected && !workerStarted) {
NioUdpWorker.close(channel, future);
}
}
}
NioUdpWorker nextWorker() { NioUdpWorker nextWorker() {
return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
} }

View File

@ -618,6 +618,20 @@ class NioUdpWorker implements Runnable {
} }
} }
static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
try {
channel.getDatagramChannel().disconnect();
future.setSuccess();
if (connected) {
fireChannelDisconnected(channel);
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
static void close(final NioDatagramChannel channel, static void close(final NioDatagramChannel channel,
final ChannelFuture future) { final ChannelFuture future) {
NioUdpWorker worker = channel.worker; NioUdpWorker worker = channel.worker;
@ -820,8 +834,6 @@ class NioUdpWorker implements Runnable {
throw new ChannelException( throw new ChannelException(
"Failed to register a socket to the selector.", e); "Failed to register a socket to the selector.", e);
} }
// XXX: Perhaps channelBind?
fireChannelConnected(channel, localAddress);
} }
} }
} }