Make sure we use the same Worker in the client during its lifetime. See

#240
This commit is contained in:
norman 2012-03-29 12:02:29 +02:00
parent 60d9364604
commit dded63b22c
9 changed files with 35 additions and 63 deletions

View File

@ -127,10 +127,10 @@ abstract class AbstractNioWorker implements Worker {
public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) {
this.executor = executor;
this.allowShutdownOnIdle = allowShutdownOnIdle;
}
public final void registerWithWorker(final Channel channel, final ChannelFuture future) {
final Selector selector = start();
try {
@ -141,7 +141,7 @@ abstract class AbstractNioWorker implements Worker {
@Override
public void run() {
try {
ch.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
ch.socket.register(selector, SelectionKey.OP_ACCEPT, ch);
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
@ -157,7 +157,7 @@ abstract class AbstractNioWorker implements Worker {
public void run() {
try {
try {
clientChannel.channel.register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE, channel);
clientChannel.channel.register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel);
} catch (ClosedChannelException e) {
clientChannel.getWorker().close(clientChannel, succeededFuture(channel));
}
@ -196,7 +196,6 @@ abstract class AbstractNioWorker implements Worker {
fireExceptionCaught(channel, t);
}
}
/**
@ -248,6 +247,7 @@ abstract class AbstractNioWorker implements Worker {
boolean shutdown = false;
Selector selector = this.selector;
for (;;) {
wakenUp.set(false);
if (CONSTRAINT_LEVEL != 0) {
@ -291,7 +291,7 @@ abstract class AbstractNioWorker implements Worker {
if (wakenUp.get()) {
selector.wakeup();
}
cancelledKeys = 0;
processRegisterTaskQueue();
processEventQueue();
@ -398,11 +398,13 @@ abstract class AbstractNioWorker implements Worker {
}
private void processWriteTaskQueue() throws IOException {
for (;;) {
final Runnable task = writeTaskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
@ -532,10 +534,10 @@ abstract class AbstractNioWorker implements Worker {
}
private void connect(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
final NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
if (ch.channel.isConnectionPending() && ch.channel.finishConnect()) {
registerTask(ch, ch.connectFuture);
if (ch.channel.finishConnect()) {
registerTask(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);
@ -570,6 +572,7 @@ abstract class AbstractNioWorker implements Worker {
}
void writeFromUserCode(final AbstractNioChannel<?> channel) {
if (!channel.isConnected()) {
cleanUpWriteBuffer(channel);
return;

View File

@ -19,5 +19,9 @@ import io.netty.channel.Channel;
public interface NioChannel extends Channel {
/**
* Returns the {@link AbstractNioWorker} which handles the IO of the {@link Channel}
*
*/
AbstractNioWorker getWorker();
}

View File

@ -22,6 +22,7 @@ import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.ClientSocketChannelFactory;
import io.netty.channel.socket.SocketChannel;
@ -81,7 +82,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
private final WorkerPool<NioWorker> workerPool;
private final NioClientSocketPipelineSink sink;
private final ChannelSink sink;
/**
* Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} for the worker executor.
@ -129,13 +130,13 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
this.workerPool = workerPool;
sink = new NioClientSocketPipelineSink(workerPool);
sink = new NioClientSocketPipelineSink();
}
@Override
public SocketChannel newChannel(ChannelPipeline pipeline) {
return NioClientSocketChannel.create(this, pipeline, sink, sink.nextWorker());
return NioClientSocketChannel.create(this, pipeline, sink, workerPool.nextWorker());
}
@Override

View File

@ -36,12 +36,6 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
private final WorkerPool<NioWorker> workerPool;
NioClientSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
this.workerPool = workerPool;
}
@Override
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
@ -106,7 +100,6 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
SocketAddress remoteAddress) {
try {
channel.channel.connect(remoteAddress);
channel.getCloseFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
@ -118,18 +111,14 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
});
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf;
nextWorker().registerWithWorker(channel, cf);
//nextBoss().register(channel);
channel.getWorker().registerWithWorker(channel, cf);
} catch (Throwable t) {
t.printStackTrace();
cf.setFailure(t);
fireExceptionCaught(channel, t);
channel.getWorker().close(channel, succeededFuture(channel));
}
}
NioWorker nextWorker() {
return workerPool.nextWorker();
}
}

View File

@ -21,6 +21,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelFactory;
@ -76,7 +77,7 @@ import io.netty.util.ExternalResourceReleasable;
*/
public class NioDatagramChannelFactory implements DatagramChannelFactory {
private final NioDatagramPipelineSink sink;
private final ChannelSink sink;
private final WorkerPool<NioDatagramWorker> workerPool;
@ -124,12 +125,12 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
*/
public NioDatagramChannelFactory(WorkerPool<NioDatagramWorker> workerPool) {
this.workerPool = workerPool;
sink = new NioDatagramPipelineSink(workerPool);
sink = new NioDatagramPipelineSink();
}
@Override
public DatagramChannel newChannel(final ChannelPipeline pipeline) {
return NioDatagramChannel.create(this, pipeline, sink, sink.nextWorker());
return NioDatagramChannel.create(this, pipeline, sink, workerPool.nextWorker());
}
@Override

View File

@ -19,7 +19,6 @@ import static io.netty.channel.Channels.*;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
@ -35,22 +34,6 @@ import io.netty.channel.MessageEvent;
*/
class NioDatagramPipelineSink extends AbstractNioChannelSink {
private final WorkerPool<NioDatagramWorker> workerPool;
/**
* Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s specified in workerCount.
* The {@link NioDatagramWorker}s take care of reading and writing for the {@link NioDatagramChannel}.
*
* @param workerExecutor
* the {@link Executor} that will run the {@link NioDatagramWorker}s
* for this sink
* @param workerCount
* the number of {@link NioDatagramWorker}s for this sink
*/
NioDatagramPipelineSink(final WorkerPool<NioDatagramWorker> workerPool) {
this.workerPool = workerPool;
}
/**
* Handle downstream event.
*
@ -183,9 +166,4 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
}
}
}
NioDatagramWorker nextWorker() {
return workerPool.nextWorker();
}
}

View File

@ -22,6 +22,7 @@ import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelFactory;
@ -84,7 +85,7 @@ import io.netty.util.ExternalResourceReleasable;
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
private final WorkerPool<NioWorker> workerPool;
private final NioServerSocketPipelineSink sink;
private final ChannelSink sink;
/**
* Create a new {@link NioServerSocketChannelFactory} using
@ -142,7 +143,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
@Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return NioServerSocketChannel.create(this, pipeline, sink, sink.nextWorker());
return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker());
}
@Override

View File

@ -67,14 +67,14 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future);
channel.getWorker().close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
channel.worker.close(channel, future);
channel.getWorker().close(channel, future);
}
break;
}
@ -116,7 +116,6 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
@ -125,21 +124,15 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
nextWorker().registerWithWorker(channel, future);
workerPool.nextWorker().registerWithWorker(channel, future);
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bound) {
channel.worker.close(channel, future);
channel.getWorker().close(channel, future);
}
}
}
NioWorker nextWorker() {
return workerPool.nextWorker();
}
}

View File

@ -165,6 +165,8 @@ public class NioWorker extends AbstractNioWorker {
selector, channel.getRawInterestOps(), channel);
}
} else {
setInterestOps(channel, future, channel.getRawInterestOps());
}
if (future != null) {
((NioSocketChannel) channel).setConnected();