From dded63b22c9521508cca7141322810929d8743f4 Mon Sep 17 00:00:00 2001 From: norman Date: Thu, 29 Mar 2012 12:02:29 +0200 Subject: [PATCH] Make sure we use the same Worker in the client during its lifetime. See #240 --- .../channel/socket/nio/AbstractNioWorker.java | 19 +++++++++------- .../netty/channel/socket/nio/NioChannel.java | 4 ++++ .../nio/NioClientSocketChannelFactory.java | 7 +++--- .../nio/NioClientSocketPipelineSink.java | 17 +++----------- .../socket/nio/NioDatagramChannelFactory.java | 7 +++--- .../socket/nio/NioDatagramPipelineSink.java | 22 ------------------- .../nio/NioServerSocketChannelFactory.java | 5 +++-- .../nio/NioServerSocketPipelineSink.java | 15 ++++--------- .../netty/channel/socket/nio/NioWorker.java | 2 ++ 9 files changed, 35 insertions(+), 63 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 84612731f4..051a4b185e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -127,10 +127,10 @@ abstract class AbstractNioWorker implements Worker { public AbstractNioWorker(Executor executor, boolean allowShutdownOnIdle) { this.executor = executor; this.allowShutdownOnIdle = allowShutdownOnIdle; + } public final void registerWithWorker(final Channel channel, final ChannelFuture future) { - final Selector selector = start(); try { @@ -141,7 +141,7 @@ abstract class AbstractNioWorker implements Worker { @Override public void run() { try { - ch.socket.register(selector, SelectionKey.OP_ACCEPT, channel); + ch.socket.register(selector, SelectionKey.OP_ACCEPT, ch); } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); @@ -157,7 +157,7 @@ abstract class AbstractNioWorker implements Worker { public void run() { try { try { - clientChannel.channel.register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE, channel); + clientChannel.channel.register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT, clientChannel); } catch (ClosedChannelException e) { clientChannel.getWorker().close(clientChannel, succeededFuture(channel)); } @@ -196,7 +196,6 @@ abstract class AbstractNioWorker implements Worker { fireExceptionCaught(channel, t); } - } /** @@ -248,6 +247,7 @@ abstract class AbstractNioWorker implements Worker { boolean shutdown = false; Selector selector = this.selector; for (;;) { + wakenUp.set(false); if (CONSTRAINT_LEVEL != 0) { @@ -291,7 +291,7 @@ abstract class AbstractNioWorker implements Worker { if (wakenUp.get()) { selector.wakeup(); } - + cancelledKeys = 0; processRegisterTaskQueue(); processEventQueue(); @@ -398,11 +398,13 @@ abstract class AbstractNioWorker implements Worker { } private void processWriteTaskQueue() throws IOException { + for (;;) { final Runnable task = writeTaskQueue.poll(); if (task == null) { break; } + task.run(); cleanUpCancelledKeys(); } @@ -532,10 +534,10 @@ abstract class AbstractNioWorker implements Worker { } private void connect(SelectionKey k) { - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); + final NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); try { - if (ch.channel.isConnectionPending() && ch.channel.finishConnect()) { - registerTask(ch, ch.connectFuture); + if (ch.channel.finishConnect()) { + registerTask(ch, ch.connectFuture); } } catch (Throwable t) { ch.connectFuture.setFailure(t); @@ -570,6 +572,7 @@ abstract class AbstractNioWorker implements Worker { } void writeFromUserCode(final AbstractNioChannel channel) { + if (!channel.isConnected()) { cleanUpWriteBuffer(channel); return; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java index 522dcebbd6..1f6dd63977 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java @@ -19,5 +19,9 @@ import io.netty.channel.Channel; public interface NioChannel extends Channel { + /** + * Returns the {@link AbstractNioWorker} which handles the IO of the {@link Channel} + * + */ AbstractNioWorker getWorker(); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java index 4777e95bc4..0b9af5f713 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java @@ -22,6 +22,7 @@ import java.util.concurrent.RejectedExecutionException; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelSink; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.ClientSocketChannelFactory; import io.netty.channel.socket.SocketChannel; @@ -81,7 +82,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory private final WorkerPool workerPool; - private final NioClientSocketPipelineSink sink; + private final ChannelSink sink; /** * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} for the worker executor. @@ -129,13 +130,13 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory this.workerPool = workerPool; - sink = new NioClientSocketPipelineSink(workerPool); + sink = new NioClientSocketPipelineSink(); } @Override public SocketChannel newChannel(ChannelPipeline pipeline) { - return NioClientSocketChannel.create(this, pipeline, sink, sink.nextWorker()); + return NioClientSocketChannel.create(this, pipeline, sink, workerPool.nextWorker()); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 62e1bc4ec4..2bbf690f3a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -36,12 +36,6 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); - private final WorkerPool workerPool; - - NioClientSocketPipelineSink(WorkerPool workerPool) { - this.workerPool = workerPool; - } - @Override public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { @@ -106,7 +100,6 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { SocketAddress remoteAddress) { try { channel.channel.connect(remoteAddress); - channel.getCloseFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) @@ -118,18 +111,14 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { }); cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); channel.connectFuture = cf; - nextWorker().registerWithWorker(channel, cf); - //nextBoss().register(channel); - + + channel.getWorker().registerWithWorker(channel, cf); } catch (Throwable t) { + t.printStackTrace(); cf.setFailure(t); fireExceptionCaught(channel, t); channel.getWorker().close(channel, succeededFuture(channel)); } } - - NioWorker nextWorker() { - return workerPool.nextWorker(); - } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java index f12668a15b..f011654b6e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelFactory.java @@ -21,6 +21,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelSink; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelFactory; @@ -76,7 +77,7 @@ import io.netty.util.ExternalResourceReleasable; */ public class NioDatagramChannelFactory implements DatagramChannelFactory { - private final NioDatagramPipelineSink sink; + private final ChannelSink sink; private final WorkerPool workerPool; @@ -124,12 +125,12 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory { */ public NioDatagramChannelFactory(WorkerPool workerPool) { this.workerPool = workerPool; - sink = new NioDatagramPipelineSink(workerPool); + sink = new NioDatagramPipelineSink(); } @Override public DatagramChannel newChannel(final ChannelPipeline pipeline) { - return NioDatagramChannel.create(this, pipeline, sink, sink.nextWorker()); + return NioDatagramChannel.create(this, pipeline, sink, workerPool.nextWorker()); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java index 78fcc34388..2c2ee7ddc3 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -19,7 +19,6 @@ import static io.netty.channel.Channels.*; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.concurrent.Executor; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -35,22 +34,6 @@ import io.netty.channel.MessageEvent; */ class NioDatagramPipelineSink extends AbstractNioChannelSink { - private final WorkerPool workerPool; - - /** - * Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s specified in workerCount. - * The {@link NioDatagramWorker}s take care of reading and writing for the {@link NioDatagramChannel}. - * - * @param workerExecutor - * the {@link Executor} that will run the {@link NioDatagramWorker}s - * for this sink - * @param workerCount - * the number of {@link NioDatagramWorker}s for this sink - */ - NioDatagramPipelineSink(final WorkerPool workerPool) { - this.workerPool = workerPool; - } - /** * Handle downstream event. * @@ -183,9 +166,4 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink { } } } - - NioDatagramWorker nextWorker() { - return workerPool.nextWorker(); - } - } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java index e1a88875fd..57ed6ecc34 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java @@ -22,6 +22,7 @@ import java.util.concurrent.RejectedExecutionException; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelSink; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannelFactory; @@ -84,7 +85,7 @@ import io.netty.util.ExternalResourceReleasable; public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { private final WorkerPool workerPool; - private final NioServerSocketPipelineSink sink; + private final ChannelSink sink; /** * Create a new {@link NioServerSocketChannelFactory} using @@ -142,7 +143,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory @Override public ServerSocketChannel newChannel(ChannelPipeline pipeline) { - return NioServerSocketChannel.create(this, pipeline, sink, sink.nextWorker()); + return NioServerSocketChannel.create(this, pipeline, sink, workerPool.nextWorker()); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 82bb871099..dbcf749f74 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -67,14 +67,14 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; } @@ -116,7 +116,6 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { private void bind( NioServerSocketChannel channel, ChannelFuture future, SocketAddress localAddress) { - boolean bound = false; try { channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); @@ -125,21 +124,15 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress()); - nextWorker().registerWithWorker(channel, future); + workerPool.nextWorker().registerWithWorker(channel, future); } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); } finally { if (!bound) { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } } } - - - NioWorker nextWorker() { - return workerPool.nextWorker(); - } - } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java index 861e32e6f4..0381f6a90a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java @@ -165,6 +165,8 @@ public class NioWorker extends AbstractNioWorker { selector, channel.getRawInterestOps(), channel); } + } else { + setInterestOps(channel, future, channel.getRawInterestOps()); } if (future != null) { ((NioSocketChannel) channel).setConnected();