From cae3010d6e9a3588b6aebb38adfe3382aa5913b8 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 8 Jul 2009 19:55:34 +0000 Subject: [PATCH] Fixed performance regression which occurs when a user tries to write something in channelConnected() --- .../socket/nio/NioAcceptedSocketChannel.java | 6 +++- .../nio/NioServerSocketPipelineSink.java | 8 +++-- .../netty/channel/socket/nio/NioWorker.java | 33 ++++++++++++++----- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java index 304ad08a2d..661d0f3824 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java @@ -41,13 +41,17 @@ import org.jboss.netty.channel.ChannelSink; */ final class NioAcceptedSocketChannel extends NioSocketChannel { + final Thread bossThread; + NioAcceptedSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, Channel parent, ChannelSink sink, - SocketChannel socket, NioWorker worker) { + SocketChannel socket, NioWorker worker, Thread bossThread) { super(parent, factory, pipeline, sink, socket, worker); + this.bossThread = bossThread; + fireChannelOpen(this); fireChannelBound(this, getLocalAddress()); fireChannelConnected(this, getRemoteAddress()); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index cb529f0103..476afe846a 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -223,6 +223,8 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { } public void run() { + final Thread currentThread = Thread.currentThread(); + for (;;) { try { if (selector.select(1000) > 0) { @@ -231,7 +233,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket != null) { - registerAcceptedChannel(acceptedSocket); + registerAcceptedChannel(acceptedSocket, currentThread); } } catch (SocketTimeoutException e) { // Thrown every second to get ClosedChannelException @@ -257,7 +259,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { closeSelector(); } - private void registerAcceptedChannel(SocketChannel acceptedSocket) { + private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { try { ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline(); @@ -265,7 +267,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { worker.register(new NioAcceptedSocketChannel( channel.getFactory(), pipeline, channel, NioServerSocketPipelineSink.this, acceptedSocket, - worker), null); + worker, currentThread), null); } catch (Exception e) { logger.warn( "Failed to initialize an accepted socket.", e); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 20c1528e73..4a18f17e5f 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -363,23 +363,38 @@ class NioWorker implements Runnable { } else { writeNow(channel, channel.getConfig().getWriteSpinCount()); } - } - private static boolean scheduleWriteIfNecessary(NioSocketChannel channel) { - NioWorker worker = channel.worker; - Thread workerThread = worker.thread; - if (workerThread == null || Thread.currentThread() != workerThread) { + private static boolean scheduleWriteIfNecessary(final NioSocketChannel channel) { + final NioWorker worker = channel.worker; + final Thread currentThread = Thread.currentThread(); + final Thread workerThread = worker.thread; + if (workerThread == null || currentThread != workerThread) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { boolean offered = worker.writeTaskQueue.offer(channel.writeTask); assert offered; } - Selector workerSelector = worker.selector; - if (workerSelector != null) { - if (worker.wakenUp.compareAndSet(false, true)) { - workerSelector.wakeup(); + + if (!(channel instanceof NioAcceptedSocketChannel) || + ((NioAcceptedSocketChannel) channel).bossThread != currentThread) { + final Selector workerSelector = worker.selector; + if (workerSelector != null) { + if (worker.wakenUp.compareAndSet(false, true)) { + workerSelector.wakeup(); + } } + } else { + // A write request can be made from an acceptor thread (boss) + // when a user attempted to write something in: + // + // * channelOpen() + // * channelBound() + // * channelConnected(). + // + // In this case, there's no need to wake up the selector because + // the channel is not even registered yet at this moment. } + return true; }