From 678be7866cea72cfac01051855d2bb4d772050d9 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 15 May 2009 08:01:54 +0000 Subject: [PATCH] NETTY-154 channelOpen / channelBound / channelConnected needs to be called from the boss thread in NIO Socket transport. * Boss thread now calls channelOpen channelBound and channelConnected * to avoid a race condition when channel.setInterestOps() in the three handler methods above, interestOpsLock is acquired properly --- .../socket/nio/NioAcceptedSocketChannel.java | 6 ++++ .../netty/channel/socket/nio/NioWorker.java | 36 ++++++++++--------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java index c2aee1ca0d..0bbdfc8889 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java @@ -22,6 +22,8 @@ */ package org.jboss.netty.channel.socket.nio; +import static org.jboss.netty.channel.Channels.*; + import java.io.IOException; import java.nio.channels.SocketChannel; @@ -53,5 +55,9 @@ final class NioAcceptedSocketChannel extends NioSocketChannel { } catch (IOException e) { throw new ChannelException("Failed to enter non-blocking mode.", e); } + + fireChannelOpen(this); + fireChannelBound(this, socket.socket().getLocalSocketAddress()); + fireChannelConnected(this, socket.socket().getRemoteSocketAddress()); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 7dba09829b..8a7257e6d0 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -575,20 +575,22 @@ class NioWorker implements Runnable { static void setInterestOps( NioSocketChannel channel, ChannelFuture future, int interestOps) { - NioWorker worker = channel.worker; - Selector selector = worker.selector; - SelectionKey key = channel.socket.keyFor(selector); - if (key == null || selector == null) { - Exception cause = new NotYetConnectedException(); - future.setFailure(cause); - fireExceptionCaught(channel, cause); - } - boolean changed = false; try { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { + NioWorker worker = channel.worker; + Selector selector = worker.selector; + SelectionKey key = channel.socket.keyFor(selector); + + if (key == null || selector == null) { + // Not registered to the worker yet. + // Set the rawInterestOps immediately; RegisterTask will pick it up. + channel.setRawInterestOpsNow(interestOps); + return; + } + // Override OP_WRITE flag - a user cannot change this flag. interestOps &= ~Channel.OP_WRITE; interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE; @@ -665,7 +667,10 @@ class NioWorker implements Runnable { } try { - channel.socket.register(selector, SelectionKey.OP_READ, channel); + synchronized (channel.interestOpsLock) { + channel.socket.register( + selector, channel.getRawInterestOps(), channel); + } if (future != null) { future.setSuccess(); } @@ -678,13 +683,12 @@ class NioWorker implements Runnable { "Failed to register a socket to the selector.", e); } - if (server) { - fireChannelOpen(channel); - fireChannelBound(channel, localAddress); - } else if (!((NioClientSocketChannel) channel).boundManually) { - fireChannelBound(channel, localAddress); + if (!server) { + if (!((NioClientSocketChannel) channel).boundManually) { + fireChannelBound(channel, localAddress); + } + fireChannelConnected(channel, remoteAddress); } - fireChannelConnected(channel, remoteAddress); } } }