NETTY-154 channelOpen / channelBound / channelConnected needs to be called from the boss thread in NIO Socket transport.

* Boss thread now calls channelOpen channelBound and channelConnected
* to avoid a race condition when channel.setInterestOps() in the three handler methods above, interestOpsLock is acquired properly
This commit is contained in:
Trustin Lee 2009-05-15 08:01:54 +00:00
parent 92b336814f
commit 678be7866c
2 changed files with 26 additions and 16 deletions

View File

@ -22,6 +22,8 @@
*/ */
package org.jboss.netty.channel.socket.nio; package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
@ -53,5 +55,9 @@ final class NioAcceptedSocketChannel extends NioSocketChannel {
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException("Failed to enter non-blocking mode.", e); throw new ChannelException("Failed to enter non-blocking mode.", e);
} }
fireChannelOpen(this);
fireChannelBound(this, socket.socket().getLocalSocketAddress());
fireChannelConnected(this, socket.socket().getRemoteSocketAddress());
} }
} }

View File

@ -575,20 +575,22 @@ class NioWorker implements Runnable {
static void setInterestOps( static void setInterestOps(
NioSocketChannel channel, ChannelFuture future, int interestOps) { NioSocketChannel channel, ChannelFuture future, int interestOps) {
NioWorker worker = channel.worker;
Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector);
if (key == null || selector == null) {
Exception cause = new NotYetConnectedException();
future.setFailure(cause);
fireExceptionCaught(channel, cause);
}
boolean changed = false; boolean changed = false;
try { try {
// interestOps can change at any time and at any thread. // interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition. // Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
NioWorker worker = channel.worker;
Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector);
if (key == null || selector == null) {
// Not registered to the worker yet.
// Set the rawInterestOps immediately; RegisterTask will pick it up.
channel.setRawInterestOpsNow(interestOps);
return;
}
// Override OP_WRITE flag - a user cannot change this flag. // Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE; interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
@ -665,7 +667,10 @@ class NioWorker implements Runnable {
} }
try { try {
channel.socket.register(selector, SelectionKey.OP_READ, channel); synchronized (channel.interestOpsLock) {
channel.socket.register(
selector, channel.getRawInterestOps(), channel);
}
if (future != null) { if (future != null) {
future.setSuccess(); future.setSuccess();
} }
@ -678,13 +683,12 @@ class NioWorker implements Runnable {
"Failed to register a socket to the selector.", e); "Failed to register a socket to the selector.", e);
} }
if (server) { if (!server) {
fireChannelOpen(channel); if (!((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress); fireChannelBound(channel, localAddress);
} else if (!((NioClientSocketChannel) channel).boundManually) { }
fireChannelBound(channel, localAddress); fireChannelConnected(channel, remoteAddress);
} }
fireChannelConnected(channel, remoteAddress);
} }
} }
} }