Fixed performance regression which occurs when a user tries to write something in channelConnected()

This commit is contained in:
Trustin Lee 2009-07-08 19:55:34 +00:00
parent d7d0877ede
commit cae3010d6e
3 changed files with 34 additions and 13 deletions

View File

@ -41,13 +41,17 @@ import org.jboss.netty.channel.ChannelSink;
*/ */
final class NioAcceptedSocketChannel extends NioSocketChannel { final class NioAcceptedSocketChannel extends NioSocketChannel {
final Thread bossThread;
NioAcceptedSocketChannel( NioAcceptedSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline, ChannelFactory factory, ChannelPipeline pipeline,
Channel parent, ChannelSink sink, Channel parent, ChannelSink sink,
SocketChannel socket, NioWorker worker) { SocketChannel socket, NioWorker worker, Thread bossThread) {
super(parent, factory, pipeline, sink, socket, worker); super(parent, factory, pipeline, sink, socket, worker);
this.bossThread = bossThread;
fireChannelOpen(this); fireChannelOpen(this);
fireChannelBound(this, getLocalAddress()); fireChannelBound(this, getLocalAddress());
fireChannelConnected(this, getRemoteAddress()); fireChannelConnected(this, getRemoteAddress());

View File

@ -223,6 +223,8 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
} }
public void run() { public void run() {
final Thread currentThread = Thread.currentThread();
for (;;) { for (;;) {
try { try {
if (selector.select(1000) > 0) { if (selector.select(1000) > 0) {
@ -231,7 +233,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
SocketChannel acceptedSocket = channel.socket.accept(); SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) { if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket); registerAcceptedChannel(acceptedSocket, currentThread);
} }
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException // Thrown every second to get ClosedChannelException
@ -257,7 +259,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
closeSelector(); closeSelector();
} }
private void registerAcceptedChannel(SocketChannel acceptedSocket) { private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
try { try {
ChannelPipeline pipeline = ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline(); channel.getConfig().getPipelineFactory().getPipeline();
@ -265,7 +267,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
worker.register(new NioAcceptedSocketChannel( worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel, channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket, NioServerSocketPipelineSink.this, acceptedSocket,
worker), null); worker, currentThread), null);
} catch (Exception e) { } catch (Exception e) {
logger.warn( logger.warn(
"Failed to initialize an accepted socket.", e); "Failed to initialize an accepted socket.", e);

View File

@ -363,23 +363,38 @@ class NioWorker implements Runnable {
} else { } else {
writeNow(channel, channel.getConfig().getWriteSpinCount()); writeNow(channel, channel.getConfig().getWriteSpinCount());
} }
} }
private static boolean scheduleWriteIfNecessary(NioSocketChannel channel) { private static boolean scheduleWriteIfNecessary(final NioSocketChannel channel) {
NioWorker worker = channel.worker; final NioWorker worker = channel.worker;
Thread workerThread = worker.thread; final Thread currentThread = Thread.currentThread();
if (workerThread == null || Thread.currentThread() != workerThread) { final Thread workerThread = worker.thread;
if (workerThread == null || currentThread != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
boolean offered = worker.writeTaskQueue.offer(channel.writeTask); boolean offered = worker.writeTaskQueue.offer(channel.writeTask);
assert offered; assert offered;
} }
Selector workerSelector = worker.selector;
if (workerSelector != null) { if (!(channel instanceof NioAcceptedSocketChannel) ||
if (worker.wakenUp.compareAndSet(false, true)) { ((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
workerSelector.wakeup(); final Selector workerSelector = worker.selector;
if (workerSelector != null) {
if (worker.wakenUp.compareAndSet(false, true)) {
workerSelector.wakeup();
}
} }
} else {
// A write request can be made from an acceptor thread (boss)
// when a user attempted to write something in:
//
// * channelOpen()
// * channelBound()
// * channelConnected().
//
// In this case, there's no need to wake up the selector because
// the channel is not even registered yet at this moment.
} }
return true; return true;
} }