Fix a bug which lead to only use two threads for all tasks all the time, even if the WorkerPool contained more. See #240

This commit is contained in:
norman 2012-04-11 08:45:51 +02:00
parent 4c0a5886ca
commit f88cd3120d
5 changed files with 13 additions and 16 deletions

View File

@ -42,14 +42,15 @@ final class NioServerSocketChannel extends AbstractServerChannel
final ServerSocketChannel socket; final ServerSocketChannel socket;
final Lock shutdownLock = new ReentrantLock(); final Lock shutdownLock = new ReentrantLock();
final NioWorker worker; final NioWorker worker;
final WorkerPool<NioWorker> workers;
private final ServerSocketChannelConfig config; private final ServerSocketChannelConfig config;
static NioServerSocketChannel create(ChannelFactory factory, static NioServerSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) { ChannelPipeline pipeline, ChannelSink sink, NioWorker worker, WorkerPool<NioWorker> workers) {
NioServerSocketChannel instance = NioServerSocketChannel instance =
new NioServerSocketChannel(factory, pipeline, sink, worker); new NioServerSocketChannel(factory, pipeline, sink, worker, workers);
fireChannelOpen(instance); fireChannelOpen(instance);
return instance; return instance;
} }
@ -57,10 +58,11 @@ final class NioServerSocketChannel extends AbstractServerChannel
private NioServerSocketChannel( private NioServerSocketChannel(
ChannelFactory factory, ChannelFactory factory,
ChannelPipeline pipeline, ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker) { ChannelSink sink, NioWorker worker, WorkerPool<NioWorker> workers) {
super(factory, pipeline, sink); super(factory, pipeline, sink);
this.worker = worker; this.worker = worker;
this.workers = workers;
try { try {
socket = ServerSocketChannel.open(); socket = ServerSocketChannel.open();
} catch (IOException e) { } catch (IOException e) {

View File

@ -137,13 +137,13 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
} }
this.workerPool = workerPool; this.workerPool = workerPool;
sink = new NioServerSocketPipelineSink(workerPool); sink = new NioServerSocketPipelineSink();
} }
@Override @Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) { public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker()); return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker(), workerPool);
} }
@Override @Override

View File

@ -35,12 +35,6 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
private final WorkerPool<NioWorker> workerPool;
NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
this.workerPool = workerPool;
}
@Override @Override
public void eventSunk( public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception { ChannelPipeline pipeline, ChannelEvent e) throws Exception {
@ -124,7 +118,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
future.setSuccess(); future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress()); fireChannelBound(channel, channel.getLocalAddress());
workerPool.nextWorker().registerWithWorker(channel, future); channel.getWorker().registerWithWorker(channel, future);
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);

View File

@ -127,8 +127,7 @@ public class NioWorker extends SelectorEventLoop {
boolean registered = channel.getJdkChannel().isRegistered(); boolean registered = channel.getJdkChannel().isRegistered();
if (!registered) { if (!registered) {
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
channel.getJdkChannel().register( channel.getJdkChannel().register(selector, channel.getRawInterestOps(), channel);
selector, channel.getRawInterestOps(), channel);
} }
} else { } else {

View File

@ -346,8 +346,10 @@ abstract class SelectorEventLoop extends SingleThreadEventLoop {
// TODO: Remove the casting stuff // TODO: Remove the casting stuff
ChannelPipeline pipeline = ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline(); channel.getConfig().getPipelineFactory().getPipeline();
registerTask(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel, NioWorker worker = channel.workers.nextWorker();
channel.getPipeline().getSink(), acceptedSocket, (NioWorker) this), null);
worker.registerWithWorker(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel,
channel.getPipeline().getSink(), acceptedSocket, worker), null);
handled = true; handled = true;
} }
return handled; return handled;