Fix a bug which lead to only use two threads for all tasks all the time, even if the WorkerPool contained more. See #240
This commit is contained in:
parent
a37d7bb5f3
commit
470c1a898a
@ -475,8 +475,10 @@ abstract class AbstractNioWorker implements Worker {
|
|||||||
// TODO: Remove the casting stuff
|
// TODO: Remove the casting stuff
|
||||||
ChannelPipeline pipeline =
|
ChannelPipeline pipeline =
|
||||||
channel.getConfig().getPipelineFactory().getPipeline();
|
channel.getConfig().getPipelineFactory().getPipeline();
|
||||||
registerTask(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel,
|
NioWorker worker = channel.workers.nextWorker();
|
||||||
channel.getPipeline().getSink(), acceptedSocket, (NioWorker) this), null);
|
|
||||||
|
worker.registerWithWorker(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel,
|
||||||
|
channel.getPipeline().getSink(), acceptedSocket, worker), null);
|
||||||
handled = true;
|
handled = true;
|
||||||
}
|
}
|
||||||
return handled;
|
return handled;
|
||||||
|
@ -42,14 +42,15 @@ final class NioServerSocketChannel extends AbstractServerChannel
|
|||||||
final ServerSocketChannel socket;
|
final ServerSocketChannel socket;
|
||||||
final Lock shutdownLock = new ReentrantLock();
|
final Lock shutdownLock = new ReentrantLock();
|
||||||
final NioWorker worker;
|
final NioWorker worker;
|
||||||
|
final WorkerPool<NioWorker> workers;
|
||||||
|
|
||||||
|
|
||||||
private final ServerSocketChannelConfig config;
|
private final ServerSocketChannelConfig config;
|
||||||
|
|
||||||
static NioServerSocketChannel create(ChannelFactory factory,
|
static NioServerSocketChannel create(ChannelFactory factory,
|
||||||
ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) {
|
ChannelPipeline pipeline, ChannelSink sink, NioWorker worker, WorkerPool<NioWorker> workers) {
|
||||||
NioServerSocketChannel instance =
|
NioServerSocketChannel instance =
|
||||||
new NioServerSocketChannel(factory, pipeline, sink, worker);
|
new NioServerSocketChannel(factory, pipeline, sink, worker, workers);
|
||||||
fireChannelOpen(instance);
|
fireChannelOpen(instance);
|
||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
@ -57,10 +58,11 @@ final class NioServerSocketChannel extends AbstractServerChannel
|
|||||||
private NioServerSocketChannel(
|
private NioServerSocketChannel(
|
||||||
ChannelFactory factory,
|
ChannelFactory factory,
|
||||||
ChannelPipeline pipeline,
|
ChannelPipeline pipeline,
|
||||||
ChannelSink sink, NioWorker worker) {
|
ChannelSink sink, NioWorker worker, WorkerPool<NioWorker> workers) {
|
||||||
|
|
||||||
super(factory, pipeline, sink);
|
super(factory, pipeline, sink);
|
||||||
this.worker = worker;
|
this.worker = worker;
|
||||||
|
this.workers = workers;
|
||||||
try {
|
try {
|
||||||
socket = ServerSocketChannel.open();
|
socket = ServerSocketChannel.open();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -137,13 +137,13 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.workerPool = workerPool;
|
this.workerPool = workerPool;
|
||||||
sink = new NioServerSocketPipelineSink(workerPool);
|
sink = new NioServerSocketPipelineSink();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
|
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
|
||||||
return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker());
|
return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker(), workerPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -35,12 +35,6 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
|
|||||||
static final InternalLogger logger =
|
static final InternalLogger logger =
|
||||||
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
|
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
|
||||||
|
|
||||||
private final WorkerPool<NioWorker> workerPool;
|
|
||||||
|
|
||||||
NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
|
|
||||||
this.workerPool = workerPool;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void eventSunk(
|
public void eventSunk(
|
||||||
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
|
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
|
||||||
@ -124,7 +118,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
|
|||||||
future.setSuccess();
|
future.setSuccess();
|
||||||
fireChannelBound(channel, channel.getLocalAddress());
|
fireChannelBound(channel, channel.getLocalAddress());
|
||||||
|
|
||||||
workerPool.nextWorker().registerWithWorker(channel, future);
|
channel.getWorker().registerWithWorker(channel, future);
|
||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
future.setFailure(t);
|
future.setFailure(t);
|
||||||
|
@ -127,8 +127,7 @@ public class NioWorker extends AbstractNioWorker {
|
|||||||
boolean registered = channel.getJdkChannel().isRegistered();
|
boolean registered = channel.getJdkChannel().isRegistered();
|
||||||
if (!registered) {
|
if (!registered) {
|
||||||
synchronized (channel.interestOpsLock) {
|
synchronized (channel.interestOpsLock) {
|
||||||
channel.getJdkChannel().register(
|
channel.getJdkChannel().register(selector, channel.getRawInterestOps(), channel);
|
||||||
selector, channel.getRawInterestOps(), channel);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user