From 470c1a898a6cdb5d20993e63d9e5b83c298290a9 Mon Sep 17 00:00:00 2001 From: norman Date: Wed, 11 Apr 2012 08:45:51 +0200 Subject: [PATCH] Fix a bug which lead to only use two threads for all tasks all the time, even if the WorkerPool contained more. See #240 --- .../io/netty/channel/socket/nio/AbstractNioWorker.java | 6 ++++-- .../netty/channel/socket/nio/NioServerSocketChannel.java | 8 +++++--- .../channel/socket/nio/NioServerSocketChannelFactory.java | 4 ++-- .../channel/socket/nio/NioServerSocketPipelineSink.java | 8 +------- .../main/java/io/netty/channel/socket/nio/NioWorker.java | 3 +-- 5 files changed, 13 insertions(+), 16 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 1db5154b27..c1df2bdcbd 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -475,8 +475,10 @@ abstract class AbstractNioWorker implements Worker { // TODO: Remove the casting stuff ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline(); - registerTask(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel, - channel.getPipeline().getSink(), acceptedSocket, (NioWorker) this), null); + NioWorker worker = channel.workers.nextWorker(); + + worker.registerWithWorker(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel, + channel.getPipeline().getSink(), acceptedSocket, worker), null); handled = true; } return handled; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 608537e90a..db7bfc9a80 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -42,14 +42,15 @@ final class NioServerSocketChannel extends AbstractServerChannel final ServerSocketChannel socket; final Lock shutdownLock = new ReentrantLock(); final NioWorker worker; + final WorkerPool workers; private final ServerSocketChannelConfig config; static NioServerSocketChannel create(ChannelFactory factory, - ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) { + ChannelPipeline pipeline, ChannelSink sink, NioWorker worker, WorkerPool workers) { NioServerSocketChannel instance = - new NioServerSocketChannel(factory, pipeline, sink, worker); + new NioServerSocketChannel(factory, pipeline, sink, worker, workers); fireChannelOpen(instance); return instance; } @@ -57,10 +58,11 @@ final class NioServerSocketChannel extends AbstractServerChannel private NioServerSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, - ChannelSink sink, NioWorker worker) { + ChannelSink sink, NioWorker worker, WorkerPool workers) { super(factory, pipeline, sink); this.worker = worker; + this.workers = workers; try { socket = ServerSocketChannel.open(); } catch (IOException e) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java index 57ed6ecc34..33b0c4fa67 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java @@ -137,13 +137,13 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory } this.workerPool = workerPool; - sink = new NioServerSocketPipelineSink(workerPool); + sink = new NioServerSocketPipelineSink(); } @Override public ServerSocketChannel newChannel(ChannelPipeline pipeline) { - return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker()); + return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker(), workerPool); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java index dbcf749f74..278f5d8873 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -35,12 +35,6 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); - private final WorkerPool workerPool; - - NioServerSocketPipelineSink(WorkerPool workerPool) { - this.workerPool = workerPool; - } - @Override public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { @@ -124,7 +118,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress()); - workerPool.nextWorker().registerWithWorker(channel, future); + channel.getWorker().registerWithWorker(channel, future); } catch (Throwable t) { future.setFailure(t); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java index 92c18307c3..3c1554f567 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java @@ -127,8 +127,7 @@ public class NioWorker extends AbstractNioWorker { boolean registered = channel.getJdkChannel().isRegistered(); if (!registered) { synchronized (channel.interestOpsLock) { - channel.getJdkChannel().register( - selector, channel.getRawInterestOps(), channel); + channel.getJdkChannel().register(selector, channel.getRawInterestOps(), channel); } } else {