Make sure the ChannelOpen, ChannelBound and ChannelConnected events get

fired from within an IO-Worker Thread. This makes sure the Boss-Thread
will not get blocked by any user action
This commit is contained in:
norman 2011-10-12 13:02:50 +02:00
parent 28120aa778
commit bbdc2032f0
4 changed files with 21 additions and 12 deletions

View File

@ -15,8 +15,6 @@
*/ */
package org.jboss.netty.channel.socket.nio; package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
@ -46,8 +44,5 @@ final class NioAcceptedSocketChannel extends NioSocketChannel {
this.bossThread = bossThread; this.bossThread = bossThread;
setConnected(); setConnected();
fireChannelOpen(this);
fireChannelBound(this, getLocalAddress());
fireChannelConnected(this, getRemoteAddress());
} }
} }

View File

@ -82,6 +82,7 @@ class NioWorker implements Runnable {
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
private final AtomicBoolean fireConnect = new AtomicBoolean(true);
NioWorker(int bossId, int id, Executor executor) { NioWorker(int bossId, int id, Executor executor) {
this.bossId = bossId; this.bossId = bossId;
@ -96,6 +97,8 @@ class NioWorker implements Runnable {
Selector selector; Selector selector;
synchronized (startStopLock) { synchronized (startStopLock) {
fireConnect.set(true);
if (!started) { if (!started) {
// Open a selector if this worker didn't start yet. // Open a selector if this worker didn't start yet.
try { try {
@ -160,6 +163,7 @@ class NioWorker implements Runnable {
} }
try { try {
SelectorUtil.select(selector); SelectorUtil.select(selector);
// 'wakenUp.compareAndSet(false, true)' is always evaluated // 'wakenUp.compareAndSet(false, true)' is always evaluated
@ -790,6 +794,14 @@ class NioWorker implements Runnable {
} }
fireChannelConnected(channel, remoteAddress); fireChannelConnected(channel, remoteAddress);
} }
// Handle the channelOpen, channelBound and channelConnected in the worker thread
if (channel instanceof NioAcceptedSocketChannel) {
fireChannelOpen(channel);
fireChannelBound(channel, channel.getLocalAddress());
fireChannelConnected(channel, channel.getRemoteAddress());
}
} }
} }
} }

View File

@ -15,8 +15,6 @@
*/ */
package org.jboss.netty.channel.socket.oio; package org.jboss.netty.channel.socket.oio;
import static org.jboss.netty.channel.Channels.*;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PushbackInputStream; import java.io.PushbackInputStream;
@ -60,10 +58,6 @@ class OioAcceptedSocketChannel extends OioSocketChannel {
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException("Failed to obtain an OutputStream.", e); throw new ChannelException("Failed to obtain an OutputStream.", e);
} }
fireChannelOpen(this);
fireChannelBound(this, getLocalAddress());
fireChannelConnected(this, getRemoteAddress());
} }
@Override @Override

View File

@ -50,7 +50,15 @@ class OioWorker implements Runnable {
channel.workerThread = Thread.currentThread(); channel.workerThread = Thread.currentThread();
final PushbackInputStream in = channel.getInputStream(); final PushbackInputStream in = channel.getInputStream();
boolean fireOpen = channel instanceof OioAcceptedSocketChannel;
while (channel.isOpen()) { while (channel.isOpen()) {
if (fireOpen) {
fireOpen = false;
fireChannelOpen(channel);
fireChannelBound(channel, channel.getLocalAddress());
fireChannelConnected(channel, channel.getRemoteAddress());
}
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) { while (!channel.isReadable()) {
try { try {