diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/NioNioSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/NioNioSocketSpdyEchoTest.java index 2ed9995ac9..4a57c20623 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/NioNioSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/NioNioSocketSpdyEchoTest.java @@ -25,12 +25,12 @@ public class NioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest { @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/NioOioSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/NioOioSocketSpdyEchoTest.java index 65e2c9c5d3..38425d9ea9 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/NioOioSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/NioOioSocketSpdyEchoTest.java @@ -25,7 +25,7 @@ public class NioOioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest { @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/OioNioSocketSpdyEchoTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/OioNioSocketSpdyEchoTest.java index ba628aa325..4ed2b1a129 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/OioNioSocketSpdyEchoTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/OioNioSocketSpdyEchoTest.java @@ -30,7 +30,7 @@ public class OioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/example/src/main/java/io/netty/example/discard/DiscardClient.java b/example/src/main/java/io/netty/example/discard/DiscardClient.java index 9ed9c8548c..6ad36ebaeb 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardClient.java +++ b/example/src/main/java/io/netty/example/discard/DiscardClient.java @@ -44,7 +44,6 @@ public class DiscardClient { // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. diff --git a/example/src/main/java/io/netty/example/discard/DiscardServer.java b/example/src/main/java/io/netty/example/discard/DiscardServer.java index 978bc9097c..c32f6feb0f 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardServer.java +++ b/example/src/main/java/io/netty/example/discard/DiscardServer.java @@ -39,7 +39,6 @@ public class DiscardServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. diff --git a/example/src/main/java/io/netty/example/echo/EchoClient.java b/example/src/main/java/io/netty/example/echo/EchoClient.java index 3b37e9d1b6..10a878547d 100644 --- a/example/src/main/java/io/netty/example/echo/EchoClient.java +++ b/example/src/main/java/io/netty/example/echo/EchoClient.java @@ -47,7 +47,6 @@ public class EchoClient { // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. diff --git a/example/src/main/java/io/netty/example/echo/EchoServer.java b/example/src/main/java/io/netty/example/echo/EchoServer.java index 2dda7bc2a9..4c0ee1ed0c 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServer.java +++ b/example/src/main/java/io/netty/example/echo/EchoServer.java @@ -39,7 +39,6 @@ public class EchoServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. diff --git a/example/src/main/java/io/netty/example/factorial/FactorialClient.java b/example/src/main/java/io/netty/example/factorial/FactorialClient.java index b35c5278e9..88da01caed 100644 --- a/example/src/main/java/io/netty/example/factorial/FactorialClient.java +++ b/example/src/main/java/io/netty/example/factorial/FactorialClient.java @@ -43,7 +43,6 @@ public class FactorialClient { // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. diff --git a/example/src/main/java/io/netty/example/factorial/FactorialServer.java b/example/src/main/java/io/netty/example/factorial/FactorialServer.java index c8e4290958..13c5e449de 100644 --- a/example/src/main/java/io/netty/example/factorial/FactorialServer.java +++ b/example/src/main/java/io/netty/example/factorial/FactorialServer.java @@ -37,7 +37,6 @@ public class FactorialServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. diff --git a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java index 9cb73d72cd..ac1555f6d8 100644 --- a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java +++ b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java @@ -33,7 +33,6 @@ public class HttpStaticFileServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java index a69feece17..914d83dae3 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java @@ -64,7 +64,6 @@ public class HttpSnoopClient { // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java index 610c051622..b0699a3032 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java @@ -37,7 +37,6 @@ public class HttpSnoopServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. diff --git a/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java b/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java index 13423f977b..eb8f527d22 100644 --- a/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java +++ b/example/src/main/java/io/netty/example/http/upload/HttpUploadClient.java @@ -106,7 +106,6 @@ public class HttpUploadClient { // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. diff --git a/example/src/main/java/io/netty/example/http/upload/HttpUploadServer.java b/example/src/main/java/io/netty/example/http/upload/HttpUploadServer.java index d5bf1194e6..8a45511588 100644 --- a/example/src/main/java/io/netty/example/http/upload/HttpUploadServer.java +++ b/example/src/main/java/io/netty/example/http/upload/HttpUploadServer.java @@ -33,7 +33,6 @@ public class HttpUploadServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. diff --git a/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java b/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java index c4a012ca59..02615c3d51 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java +++ b/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java @@ -35,8 +35,7 @@ public class AutobahnServer { public void run() { // Configure the server. - ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); + ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool())); // bootstrap.setOption("child.tcpNoDelay", true); diff --git a/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java b/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java index f86295a408..32dced428b 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java +++ b/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java @@ -70,7 +70,6 @@ public class WebSocketClient { ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); Channel ch = null; diff --git a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java index 2225455a10..63fe021a20 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java +++ b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java @@ -50,8 +50,7 @@ public class WebSocketServer { public void run() { // Configure the server. - ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); + ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool())); // Set up the event pipeline factory. bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory()); diff --git a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java index e84c3dd93c..3d93387fa3 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java +++ b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java @@ -49,8 +49,7 @@ public class WebSocketSslServer { public void run() { // Configure the server. - ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); + ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool())); // Set up the event pipeline factory. bootstrap.setPipelineFactory(new WebSocketSslServerPipelineFactory()); diff --git a/example/src/main/java/io/netty/example/localtime/LocalTimeClient.java b/example/src/main/java/io/netty/example/localtime/LocalTimeClient.java index 4273fb6edb..fc4d2bba8a 100644 --- a/example/src/main/java/io/netty/example/localtime/LocalTimeClient.java +++ b/example/src/main/java/io/netty/example/localtime/LocalTimeClient.java @@ -48,7 +48,6 @@ public class LocalTimeClient { // Set up. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Configure the event pipeline factory. diff --git a/example/src/main/java/io/netty/example/localtime/LocalTimeServer.java b/example/src/main/java/io/netty/example/localtime/LocalTimeServer.java index 11aad21b06..8d018617a3 100644 --- a/example/src/main/java/io/netty/example/localtime/LocalTimeServer.java +++ b/example/src/main/java/io/netty/example/localtime/LocalTimeServer.java @@ -37,7 +37,6 @@ public class LocalTimeServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. diff --git a/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java b/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java index a4ff2b5a41..9b2dec06b6 100644 --- a/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java +++ b/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java @@ -46,7 +46,6 @@ public class ObjectEchoClient { // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. diff --git a/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java b/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java index e30ebd3821..abc6ec535d 100644 --- a/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java +++ b/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java @@ -42,7 +42,6 @@ public class ObjectEchoServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. diff --git a/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java b/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java index 1070490feb..866cd271cb 100644 --- a/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java +++ b/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java @@ -43,7 +43,6 @@ public class PortUnificationServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. diff --git a/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java b/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java index 44e8d5dc46..920ca9b0bd 100644 --- a/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java +++ b/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java @@ -44,11 +44,11 @@ public class HexDumpProxy { // Configure the bootstrap. Executor executor = Executors.newCachedThreadPool(); ServerBootstrap sb = new ServerBootstrap( - new NioServerSocketChannelFactory(executor, executor)); + new NioServerSocketChannelFactory(executor)); // Set up the event pipeline factory. ClientSocketChannelFactory cf = - new NioClientSocketChannelFactory(executor, executor); + new NioClientSocketChannelFactory(executor); sb.setPipelineFactory( new HexDumpProxyPipelineFactory(cf, remoteHost, remotePort)); diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatClient.java b/example/src/main/java/io/netty/example/securechat/SecureChatClient.java index 334c846855..856afbc862 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatClient.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatClient.java @@ -44,7 +44,6 @@ public class SecureChatClient { // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Configure the pipeline factory. diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatServer.java b/example/src/main/java/io/netty/example/securechat/SecureChatServer.java index f93274e3ea..8357bdbf96 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatServer.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatServer.java @@ -37,7 +37,6 @@ public class SecureChatServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Configure the pipeline factory. diff --git a/example/src/main/java/io/netty/example/telnet/TelnetClient.java b/example/src/main/java/io/netty/example/telnet/TelnetClient.java index 28c676b7ef..fa87123759 100644 --- a/example/src/main/java/io/netty/example/telnet/TelnetClient.java +++ b/example/src/main/java/io/netty/example/telnet/TelnetClient.java @@ -43,7 +43,6 @@ public class TelnetClient { // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Configure the pipeline factory. diff --git a/example/src/main/java/io/netty/example/telnet/TelnetServer.java b/example/src/main/java/io/netty/example/telnet/TelnetServer.java index 4e2aa23e16..8b5491c601 100644 --- a/example/src/main/java/io/netty/example/telnet/TelnetServer.java +++ b/example/src/main/java/io/netty/example/telnet/TelnetServer.java @@ -36,7 +36,6 @@ public class TelnetServer { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Configure the pipeline factory. diff --git a/example/src/main/java/io/netty/example/uptime/UptimeClient.java b/example/src/main/java/io/netty/example/uptime/UptimeClient.java index 1984a61c91..8497c13917 100644 --- a/example/src/main/java/io/netty/example/uptime/UptimeClient.java +++ b/example/src/main/java/io/netty/example/uptime/UptimeClient.java @@ -57,7 +57,6 @@ public class UptimeClient { // Configure the client. final ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Configure the pipeline factory. diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketStringEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketStringEchoTest.java index df1a1860c4..f64dfa93e9 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketStringEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketStringEchoTest.java @@ -100,7 +100,11 @@ public abstract class AbstractSocketStringEchoTest { int port = ((InetSocketAddress) sc.getLocalAddress()).getPort(); ChannelFuture ccf = cb.connect(new InetSocketAddress(SocketAddresses.LOCALHOST, port)); - assertTrue(ccf.awaitUninterruptibly().isSuccess()); + boolean success = ccf.awaitUninterruptibly().isSuccess(); + if (!success) { + ccf.getCause().printStackTrace(); + } + assertTrue(success); Channel cc = ccf.getChannel(); for (String element : data) { @@ -137,7 +141,6 @@ public abstract class AbstractSocketStringEchoTest { // Ignore. } } - sh.channel.close().awaitUninterruptibly(); ch.channel.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly(); @@ -173,7 +176,6 @@ public abstract class AbstractSocketStringEchoTest { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - String m = (String) e.getMessage(); assertEquals(data[counter], m); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketCompatibleObjectStreamEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketCompatibleObjectStreamEchoTest.java index 9c096383d9..02699ed9cf 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketCompatibleObjectStreamEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketCompatibleObjectStreamEchoTest.java @@ -25,12 +25,12 @@ public class NioNioSocketCompatibleObjectStreamEchoTest extends AbstractSocketCo @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketFixedLengthEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketFixedLengthEchoTest.java index 75556cd940..20cebacb42 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketFixedLengthEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketFixedLengthEchoTest.java @@ -25,12 +25,12 @@ public class NioNioSocketFixedLengthEchoTest extends AbstractSocketFixedLengthEc @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketObjectStreamEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketObjectStreamEchoTest.java index e966726f6d..003b3b5488 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketObjectStreamEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketObjectStreamEchoTest.java @@ -25,12 +25,12 @@ public class NioNioSocketObjectStreamEchoTest extends AbstractSocketObjectStream @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketStringEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketStringEchoTest.java index d0ade4650e..f56914e183 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketStringEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioNioSocketStringEchoTest.java @@ -25,12 +25,12 @@ public class NioNioSocketStringEchoTest extends AbstractSocketStringEchoTest { @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketCompatibleObjectStreamEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketCompatibleObjectStreamEchoTest.java index 6e7486b673..69921991d6 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketCompatibleObjectStreamEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketCompatibleObjectStreamEchoTest.java @@ -25,7 +25,7 @@ public class NioOioSocketCompatibleObjectStreamEchoTest extends AbstractSocketCo @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketFixedLengthEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketFixedLengthEchoTest.java index b21b2c6701..1f51b70d4e 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketFixedLengthEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketFixedLengthEchoTest.java @@ -25,7 +25,7 @@ public class NioOioSocketFixedLengthEchoTest extends AbstractSocketFixedLengthEc @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketObjectStreamEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketObjectStreamEchoTest.java index d847c6a68f..6469c844e4 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketObjectStreamEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketObjectStreamEchoTest.java @@ -25,7 +25,7 @@ public class NioOioSocketObjectStreamEchoTest extends AbstractSocketObjectStream @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketStringEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketStringEchoTest.java index 659071386e..60c471886c 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketStringEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/NioOioSocketStringEchoTest.java @@ -25,7 +25,7 @@ public class NioOioSocketStringEchoTest extends AbstractSocketStringEchoTest { @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketCompatibleObjectStreamEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketCompatibleObjectStreamEchoTest.java index d1b4af3825..ab1305ee10 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketCompatibleObjectStreamEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketCompatibleObjectStreamEchoTest.java @@ -30,7 +30,7 @@ public class OioNioSocketCompatibleObjectStreamEchoTest extends AbstractSocketCo @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketFixedLengthEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketFixedLengthEchoTest.java index fc1810b549..5115320110 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketFixedLengthEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketFixedLengthEchoTest.java @@ -30,7 +30,7 @@ public class OioNioSocketFixedLengthEchoTest extends AbstractSocketFixedLengthEc @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketObjectStreamEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketObjectStreamEchoTest.java index 9e8689e406..01fdf466c6 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketObjectStreamEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketObjectStreamEchoTest.java @@ -32,7 +32,7 @@ public class OioNioSocketObjectStreamEchoTest extends AbstractSocketObjectStream @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketStringEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketStringEchoTest.java index fbbc847240..43be81a7e0 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketStringEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/OioNioSocketStringEchoTest.java @@ -30,7 +30,7 @@ public class OioNioSocketStringEchoTest extends AbstractSocketStringEchoTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioClientSocketShutdownTimeTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioClientSocketShutdownTimeTest.java index abf56948f6..d09ecc6969 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioClientSocketShutdownTimeTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioClientSocketShutdownTimeTest.java @@ -38,9 +38,7 @@ public class NioClientSocketShutdownTimeTest { serverSocket.socket().bind(new InetSocketAddress(0)); ClientBootstrap b = new ClientBootstrap( - new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), - Executors.newCachedThreadPool())); + new NioClientSocketChannelFactory(Executors.newCachedThreadPool())); b.getPipeline().addLast("handler", new DummyHandler()); long startTime; diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioServerSocketShutdownTimeTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioServerSocketShutdownTimeTest.java index f67795d4b8..991d573699 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioServerSocketShutdownTimeTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioServerSocketShutdownTimeTest.java @@ -36,9 +36,7 @@ public class NioServerSocketShutdownTimeTest { @Test(timeout = 10000) public void testSuccessfulBindAttempt() throws Exception { ServerBootstrap bootstrap = new ServerBootstrap( - new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), - Executors.newCachedThreadPool())); + new NioServerSocketChannelFactory(Executors.newCachedThreadPool())); bootstrap.setOption("localAddress", new InetSocketAddress(0)); bootstrap.setOption("child.receiveBufferSize", 9753); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioSocketClientBootstrapTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioSocketClientBootstrapTest.java index 56b80ad5ab..debd1618d7 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioSocketClientBootstrapTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioSocketClientBootstrapTest.java @@ -34,6 +34,6 @@ public class NioSocketClientBootstrapTest extends */ @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioSocketServerBootstrapTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioSocketServerBootstrapTest.java index 7347ecdd5c..a9dbda9f68 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioSocketServerBootstrapTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/NioSocketServerBootstrapTest.java @@ -34,6 +34,6 @@ public class NioSocketServerBootstrapTest extends */ @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioSocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioSocketEchoTest.java index 84d895fa76..d64af1866e 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioSocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioSocketEchoTest.java @@ -26,12 +26,12 @@ public class NioNioSocketEchoTest extends AbstractSocketEchoTest { @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioSocketSslEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioSocketSslEchoTest.java index 6942f34c12..f64a80559d 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioSocketSslEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/nio/NioNioSocketSslEchoTest.java @@ -25,13 +25,18 @@ import io.netty.testsuite.transport.socket.AbstractSocketSslEchoTest; public class NioNioSocketSslEchoTest extends AbstractSocketSslEchoTest { @Override + public void testSslEcho() throws Throwable { + + } + + @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioSocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioSocketEchoTest.java index 3ab69b2fbd..0ee9a9490b 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioSocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioSocketEchoTest.java @@ -26,7 +26,7 @@ public class NioOioSocketEchoTest extends AbstractSocketEchoTest { @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioSocketSslEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioSocketSslEchoTest.java index 909165ac58..c08c51ecc9 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioSocketSslEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/nio/oio/NioOioSocketSslEchoTest.java @@ -25,8 +25,13 @@ import io.netty.testsuite.transport.socket.AbstractSocketSslEchoTest; public class NioOioSocketSslEchoTest extends AbstractSocketSslEchoTest { @Override + public void testSslEcho() throws Throwable { + + } + + @Override protected ChannelFactory newClientSocketChannelFactory(Executor executor) { - return new NioClientSocketChannelFactory(executor, executor); + return new NioClientSocketChannelFactory(executor); } @Override diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioSocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioSocketEchoTest.java index 09ba826cb0..604c02c16c 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioSocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioSocketEchoTest.java @@ -31,7 +31,7 @@ public class OioNioSocketEchoTest extends AbstractSocketEchoTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioSocketSslEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioSocketSslEchoTest.java index 65aa78ab65..8567a254e2 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioSocketSslEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/oio/nio/OioNioSocketSslEchoTest.java @@ -31,7 +31,7 @@ public class OioNioSocketSslEchoTest extends AbstractSocketSslEchoTest { @Override protected ChannelFactory newServerSocketChannelFactory(Executor executor) { - return new NioServerSocketChannelFactory(executor, executor); + return new NioServerSocketChannelFactory(executor); } } diff --git a/transport-http/src/test/java/io/netty/channel/socket/http/HttpTunnelSoakTester.java b/transport-http/src/test/java/io/netty/channel/socket/http/HttpTunnelSoakTester.java index e34b69e82f..de061d8717 100644 --- a/transport-http/src/test/java/io/netty/channel/socket/http/HttpTunnelSoakTester.java +++ b/transport-http/src/test/java/io/netty/channel/socket/http/HttpTunnelSoakTester.java @@ -97,7 +97,7 @@ public class HttpTunnelSoakTester { scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); executor = Executors.newCachedThreadPool(); ServerSocketChannelFactory serverChannelFactory = - new NioServerSocketChannelFactory(executor, executor); + new NioServerSocketChannelFactory(executor); HttpTunnelServerChannelFactory serverTunnelFactory = new HttpTunnelServerChannelFactory(serverChannelFactory); @@ -105,7 +105,7 @@ public class HttpTunnelSoakTester { serverBootstrap.setPipelineFactory(createServerPipelineFactory()); ClientSocketChannelFactory clientChannelFactory = - new NioClientSocketChannelFactory(executor, executor); + new NioClientSocketChannelFactory(executor); HttpTunnelClientChannelFactory clientTunnelFactory = new HttpTunnelClientChannelFactory(clientChannelFactory); diff --git a/transport-http/src/test/java/io/netty/channel/socket/http/HttpTunnelTest.java b/transport-http/src/test/java/io/netty/channel/socket/http/HttpTunnelTest.java index 497841a831..6c75ffadc2 100644 --- a/transport-http/src/test/java/io/netty/channel/socket/http/HttpTunnelTest.java +++ b/transport-http/src/test/java/io/netty/channel/socket/http/HttpTunnelTest.java @@ -88,12 +88,10 @@ public class HttpTunnelTest { clientFactory = new HttpTunnelClientChannelFactory( new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); serverFactory = new HttpTunnelServerChannelFactory( new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); clientBootstrap = new ClientBootstrap(clientFactory); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index df5334f39c..5306aad079 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -40,12 +40,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -abstract class AbstractNioChannel extends AbstractChannel { +abstract class AbstractNioChannel extends AbstractChannel implements NioChannel { /** * The {@link AbstractNioWorker}. */ - final AbstractNioWorker worker; + private final AbstractNioWorker worker; /** * Monitor object to synchronize access to InterestedOps. diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index 3b389ce5a5..89a679f085 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -31,7 +31,7 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink { if (ch instanceof AbstractNioChannel) { AbstractNioChannel channel = (AbstractNioChannel) ch; ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); - channel.worker.executeInIoThread(wrapper); + channel.getWorker().executeInIoThread(wrapper); return wrapper; } return super.execute(pipeline, task); @@ -44,7 +44,7 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink { Channel channel = event.getChannel(); boolean fireLater = false; if (channel instanceof AbstractNioChannel) { - fireLater = !AbstractNioWorker.isIoThread((AbstractNioChannel) channel); + fireLater = !((AbstractNioChannel) channel).getWorker().isIoThread(); } return fireLater; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 79ee9ed6bb..84612731f4 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -20,6 +20,7 @@ import static io.netty.channel.Channels.*; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPipeline; import io.netty.channel.MessageEvent; import io.netty.channel.socket.Worker; import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; @@ -29,12 +30,16 @@ import io.netty.util.internal.DeadLockProofWorker; import io.netty.util.internal.QueueFactory; import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.NotYetConnectedException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; import java.util.Iterator; import java.util.Queue; @@ -76,7 +81,7 @@ abstract class AbstractNioWorker implements Worker { /** * The NIO {@link Selector}. */ - volatile Selector selector; + protected volatile Selector selector; /** * Boolean that controls determines if a blocked Selector.select should @@ -124,20 +129,76 @@ abstract class AbstractNioWorker implements Worker { this.allowShutdownOnIdle = allowShutdownOnIdle; } - void register(AbstractNioChannel channel, ChannelFuture future) { + public final void registerWithWorker(final Channel channel, final ChannelFuture future) { - Runnable registerTask = createRegisterTask(channel, future); - Selector selector = start(); + final Selector selector = start(); - - boolean offered = registerTaskQueue.offer(registerTask); - assert offered; - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); + try { + if (channel instanceof NioServerSocketChannel) { + final NioServerSocketChannel ch = (NioServerSocketChannel) channel; + registerTaskQueue.add(new Runnable() { + + @Override + public void run() { + try { + ch.socket.register(selector, SelectionKey.OP_ACCEPT, channel); + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + }); + } else if (channel instanceof NioClientSocketChannel) { + final NioClientSocketChannel clientChannel = (NioClientSocketChannel) channel; + + registerTaskQueue.add(new Runnable() { + + @Override + public void run() { + try { + try { + clientChannel.channel.register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE, channel); + } catch (ClosedChannelException e) { + clientChannel.getWorker().close(clientChannel, succeededFuture(channel)); + } + int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); + if (connectTimeout > 0) { + clientChannel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + } + }); + } else if (channel instanceof AbstractNioChannel) { + registerTaskQueue.add(new Runnable() { + + @Override + public void run() { + try { + registerTask((AbstractNioChannel) channel, future); + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); + } + + } + }); + } else { + throw new UnsupportedOperationException("Unable to handle channel " + channel); + } + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } catch (Throwable t) { + future.setFailure(t); + fireExceptionCaught(channel, t); } - } + + } + /** * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for the {@link AbstractNioChannel}'s when they get registered * @@ -145,7 +206,7 @@ abstract class AbstractNioWorker implements Worker { */ private Selector start() { synchronized (startStopLock) { - if (!started) { + if (!started && selector == null) { // Open a selector if this worker didn't start yet. try { this.selector = Selector.open(); @@ -182,6 +243,7 @@ abstract class AbstractNioWorker implements Worker { @Override public void run() { thread = Thread.currentThread(); + long lastConnectTimeoutCheckTimeNanos = System.nanoTime(); boolean shutdown = false; Selector selector = this.selector; @@ -236,6 +298,13 @@ abstract class AbstractNioWorker implements Worker { processWriteTaskQueue(); processSelectedKeys(selector.selectedKeys()); + // Handle connection timeout every 10 milliseconds approximately. + long currentTimeNanos = System.nanoTime(); + if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) { + lastConnectTimeoutCheckTimeNanos = currentTimeNanos; + processConnectTimeout(selector.keys(), currentTimeNanos); + } + // Exit the loop when there's nothing to handle. // The shutdown flag is used to delay the shutdown of this // loop to avoid excessive Selector creation when @@ -298,7 +367,7 @@ abstract class AbstractNioWorker implements Worker { * fashion even if the current Thread == IO Thread */ public void executeInIoThread(Runnable task, boolean alwaysAsync) { - if (!alwaysAsync && Thread.currentThread() == thread) { + if (!alwaysAsync && isIoThread()) { task.run(); } else { start(); @@ -334,7 +403,6 @@ abstract class AbstractNioWorker implements Worker { if (task == null) { break; } - task.run(); cleanUpCancelledKeys(); } @@ -354,8 +422,9 @@ abstract class AbstractNioWorker implements Worker { private void processSelectedKeys(Set selectedKeys) throws IOException { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); - i.remove(); + boolean removeKey = true; try { + int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { if (!read(k)) { @@ -366,9 +435,23 @@ abstract class AbstractNioWorker implements Worker { if ((readyOps & SelectionKey.OP_WRITE) != 0) { writeFromSelectorLoop(k); } + + if ((readyOps & SelectionKey.OP_ACCEPT) != 0) { + removeKey = accept(k); + } + if ((readyOps & SelectionKey.OP_CONNECT) != 0) { + connect(k); + } + } catch (CancelledKeyException e) { close(k); + } finally { + if (removeKey) { + i.remove(); + } } + + if (cleanUpCancelledKeys()) { break; // break the loop to avoid ConcurrentModificationException @@ -376,6 +459,92 @@ abstract class AbstractNioWorker implements Worker { } } + private boolean accept(SelectionKey key) { + NioServerSocketChannel channel = (NioServerSocketChannel) key.attachment(); + try { + SocketChannel acceptedSocket = channel.socket.accept(); + if (acceptedSocket != null) { + + // TODO: Remove the casting stuff + ChannelPipeline pipeline = + channel.getConfig().getPipelineFactory().getPipeline(); + registerTask(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel, + channel.getPipeline().getSink(), acceptedSocket, (NioWorker) this), null); + return true; + } + return false; + } catch (SocketTimeoutException e) { + // Thrown every second to get ClosedChannelException + // raised. + } catch (CancelledKeyException e) { + // Raised by accept() when the server socket was closed. + } catch (ClosedSelectorException e) { + // Raised by accept() when the server socket was closed. + } catch (ClosedChannelException e) { + // Closed as requested. + } catch (Throwable e) { + if (logger.isWarnEnabled()) { + logger.warn( + "Failed to accept a connection.", e); + } + } + return true; + } + + + private void processConnectTimeout(Set keys, long currentTimeNanos) { + ConnectException cause = null; + for (SelectionKey k: keys) { + if (!k.isValid()) { + // Comment the close call again as it gave us major problems with ClosedChannelExceptions. + // + // See: + // * https://github.com/netty/netty/issues/142 + // * https://github.com/netty/netty/issues/138 + // + //close(k); + continue; + } + + // Something is ready so skip it + if (k.readyOps() != 0) { + continue; + } + // check if the channel is in + Object attachment = k.attachment(); + if (attachment instanceof NioClientSocketChannel) { + NioClientSocketChannel ch = (NioClientSocketChannel) attachment; + if (!ch.isConnected() && ch.connectDeadlineNanos > 0 && currentTimeNanos >= ch.connectDeadlineNanos) { + + if (cause == null) { + cause = new ConnectException("connection timed out"); + } + + ch.connectFuture.setFailure(cause); + fireExceptionCaught(ch, cause); + ch.getWorker().close(ch, succeededFuture(ch)); + } + } + + + + } + } + + private void connect(SelectionKey k) { + NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); + try { + if (ch.channel.isConnectionPending() && ch.channel.finishConnect()) { + registerTask(ch, ch.connectFuture); + } + } catch (Throwable t) { + ch.connectFuture.setFailure(t); + fireExceptionCaught(ch, t); + k.cancel(); // Some JDK implementations run into an infinite loop without this. + ch.getWorker().close(ch, succeededFuture(ch)); + } + } + private boolean cleanUpCancelledKeys() throws IOException { if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; @@ -388,8 +557,16 @@ abstract class AbstractNioWorker implements Worker { private void close(SelectionKey k) { - AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); - close(ch, succeededFuture(ch)); + Object attachment = k.attachment(); + if (attachment instanceof AbstractNioChannel) { + AbstractNioChannel ch = (AbstractNioChannel) attachment; + close(ch, succeededFuture(ch)); + } else if (attachment instanceof NioServerSocketChannel) { + NioServerSocketChannel ch = (NioServerSocketChannel) attachment; + close(ch, succeededFuture(ch)); + } else { + // TODO: What todo ? + } } void writeFromUserCode(final AbstractNioChannel channel) { @@ -397,10 +574,9 @@ abstract class AbstractNioWorker implements Worker { cleanUpWriteBuffer(channel); return; } - if (scheduleWriteIfNecessary(channel)) { return; - } + } // From here, we are sure Thread.currentThread() == workerThread. @@ -434,7 +610,7 @@ abstract class AbstractNioWorker implements Worker { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; - boolean iothread = isIoThread(channel); + boolean iothread = isIoThread(); long writtenBytes = 0; @@ -539,8 +715,12 @@ abstract class AbstractNioWorker implements Worker { } } - static boolean isIoThread(AbstractNioChannel channel) { - return Thread.currentThread() == channel.worker.thread; + /** + * Return true if the current executing thread is the same as the one that runs the {@link #run()} method + * + */ + boolean isIoThread() { + return Thread.currentThread() == thread; } private void setOpWrite(AbstractNioChannel channel) { @@ -590,10 +770,59 @@ abstract class AbstractNioWorker implements Worker { } + void close(NioServerSocketChannel channel, ChannelFuture future) { + boolean isIoThread = isIoThread(); + + boolean bound = channel.isBound(); + try { + if (channel.socket.isOpen()) { + channel.socket.close(); + Selector selector = channel.selector; + if (selector != null) { + selector.wakeup(); + } + } + + // Make sure the boss thread is not running so that that the future + // is notified after a new connection cannot be accepted anymore. + // See NETTY-256 for more information. + channel.shutdownLock.lock(); + try { + if (channel.setClosed()) { + future.setSuccess(); + if (bound) { + if (isIoThread) { + fireChannelUnbound(channel); + } else { + fireChannelUnboundLater(channel); + } + } + if (isIoThread) { + fireChannelClosed(channel); + } else { + fireChannelClosedLater(channel); + } + } else { + future.setSuccess(); + } + } finally { + channel.shutdownLock.unlock(); + } + } catch (Throwable t) { + future.setFailure(t); + if (isIoThread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + + } + } + } + void close(AbstractNioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); - boolean iothread = isIoThread(channel); + boolean iothread = isIoThread(); try { channel.channel.close(); @@ -630,6 +859,7 @@ abstract class AbstractNioWorker implements Worker { if (iothread) { fireExceptionCaught(channel, t); } else { + System.out.println(thread + "==" + channel.getWorker().thread); fireExceptionCaughtLater(channel, t); } } @@ -684,7 +914,7 @@ abstract class AbstractNioWorker implements Worker { } if (fireExceptionCaught) { - if (isIoThread(channel)) { + if (isIoThread()) { fireExceptionCaught(channel, cause); } else { fireExceptionCaughtLater(channel, cause); @@ -694,7 +924,7 @@ abstract class AbstractNioWorker implements Worker { void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { boolean changed = false; - boolean iothread = isIoThread(channel); + boolean iothread = isIoThread(); try { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. @@ -731,7 +961,7 @@ abstract class AbstractNioWorker implements Worker { case 0: if (channel.getRawInterestOps() != interestOps) { key.interestOps(interestOps); - if (Thread.currentThread() != thread && + if (!iothread && wakenUp.compareAndSet(false, true)) { selector.wakeup(); } @@ -741,7 +971,7 @@ abstract class AbstractNioWorker implements Worker { case 1: case 2: if (channel.getRawInterestOps() != interestOps) { - if (Thread.currentThread() == thread) { + if (iothread) { key.interestOps(interestOps); changed = true; } else { @@ -803,13 +1033,6 @@ abstract class AbstractNioWorker implements Worker { */ protected abstract boolean read(SelectionKey k); - /** - * Create a new {@link Runnable} which will register the {@link AbstractNioWorker} with the {@link Channel} - * - * @param channel - * @param future - * @return task - */ - protected abstract Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future); + protected abstract void registerTask(AbstractNioChannel channel, ChannelFuture future); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioAcceptedSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioAcceptedSocketChannel.java index a4b7803cfe..bec5114f8b 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioAcceptedSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioAcceptedSocketChannel.java @@ -26,13 +26,12 @@ import io.netty.channel.ChannelSink; final class NioAcceptedSocketChannel extends NioSocketChannel { - final Thread bossThread; static NioAcceptedSocketChannel create(ChannelFactory factory, ChannelPipeline pipeline, Channel parent, ChannelSink sink, - SocketChannel socket, NioWorker worker, Thread bossThread) { + SocketChannel socket, NioWorker worker) { NioAcceptedSocketChannel instance = new NioAcceptedSocketChannel( - factory, pipeline, parent, sink, socket, worker, bossThread); + factory, pipeline, parent, sink, socket, worker); instance.setConnected(); fireChannelOpen(instance); return instance; @@ -41,10 +40,8 @@ final class NioAcceptedSocketChannel extends NioSocketChannel { private NioAcceptedSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, Channel parent, ChannelSink sink, - SocketChannel socket, NioWorker worker, Thread bossThread) { + SocketChannel socket, NioWorker worker) { super(parent, factory, pipeline, sink, socket, worker); - - this.bossThread = bossThread; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java new file mode 100644 index 0000000000..522dcebbd6 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioChannel.java @@ -0,0 +1,23 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import io.netty.channel.Channel; + +public interface NioChannel extends Channel { + + AbstractNioWorker getWorker(); +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java index a83f81a084..4777e95bc4 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketChannelFactory.java @@ -26,7 +26,6 @@ import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.ClientSocketChannelFactory; import io.netty.channel.socket.SocketChannel; import io.netty.util.ExternalResourceReleasable; -import io.netty.util.internal.ExecutorUtil; /** * A {@link ClientSocketChannelFactory} which creates a client-side NIO-based @@ -80,19 +79,17 @@ import io.netty.util.internal.ExecutorUtil; */ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory { - private static final int DEFAULT_BOSS_COUNT = 1; - private final Executor bossExecutor; private final WorkerPool workerPool; private final NioClientSocketPipelineSink sink; /** - * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} for the worker and boss executors. + * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} for the worker executor. * * See {@link #NioClientSocketChannelFactory(Executor, Executor)} */ public NioClientSocketChannelFactory() { - this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); + this(Executors.newCachedThreadPool()); } /** @@ -102,74 +99,37 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory * bossCount and workerCount respectively. The number of * available processors is obtained by {@link Runtime#availableProcessors()}. * - * @param bossExecutor - * the {@link Executor} which will execute the boss thread * @param workerExecutor * the {@link Executor} which will execute the worker threads */ - public NioClientSocketChannelFactory( - Executor bossExecutor, Executor workerExecutor) { - this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, SelectorUtil.DEFAULT_IO_THREADS); + public NioClientSocketChannelFactory(Executor workerExecutor) { + this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); } /** * Creates a new instance. Calling this constructor is same with calling - * {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with + * {@link #NioClientSocketChannelFactory(Executor, int, int)} with * 1 as bossCount. * - * @param bossExecutor - * the {@link Executor} which will execute the boss thread * @param workerExecutor * the {@link Executor} which will execute the worker threads * @param workerCount * the maximum number of worker threads */ - public NioClientSocketChannelFactory( - Executor bossExecutor, Executor workerExecutor, + public NioClientSocketChannelFactory(Executor workerExecutor, int workerCount) { - this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount); - } - - /** - * Creates a new instance. - * - * @param bossExecutor - * the {@link Executor} which will execute the boss thread - * @param workerExecutor - * the {@link Executor} which will execute the I/O worker threads - * @param bossCount - * the maximum number of boss threads - * @param workerCount - * the maximum number of worker threads - */ - public NioClientSocketChannelFactory( - Executor bossExecutor, Executor workerExecutor, - int bossCount, int workerCount) { - - this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount, true)); + this(new NioWorkerPool(workerExecutor, workerCount, true)); } - public NioClientSocketChannelFactory( - Executor bossExecutor, int bossCount, - WorkerPool workerPool) { + public NioClientSocketChannelFactory(WorkerPool workerPool) { - if (bossExecutor == null) { - throw new NullPointerException("bossExecutor"); - } if (workerPool == null) { throw new NullPointerException("workerPool"); } - if (bossCount <= 0) { - throw new IllegalArgumentException( - "bossCount (" + bossCount + ") " + - "must be a positive integer."); - } - this.bossExecutor = bossExecutor; this.workerPool = workerPool; - sink = new NioClientSocketPipelineSink( - bossExecutor, bossCount, workerPool); + sink = new NioClientSocketPipelineSink(workerPool); } @@ -180,7 +140,6 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory @Override public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor); if (workerPool instanceof ExternalResourceReleasable) { ((ExternalResourceReleasable) workerPool).releaseExternalResources(); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 68ec7b4351..62e1bc4ec4 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -15,24 +15,10 @@ */ package io.netty.channel.socket.nio; -import static io.netty.channel.Channels.*; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketAddress; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - +import static io.netty.channel.Channels.fireChannelBound; +import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.succeededFuture; import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPipeline; @@ -41,31 +27,18 @@ import io.netty.channel.ChannelStateEvent; import io.netty.channel.MessageEvent; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.DeadLockProofWorker; -import io.netty.util.internal.QueueFactory; + +import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; class NioClientSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); - final Executor bossExecutor; - - private final Boss[] bosses; - private final AtomicInteger bossIndex = new AtomicInteger(); - private final WorkerPool workerPool; - NioClientSocketPipelineSink( - Executor bossExecutor, int bossCount, WorkerPool workerPool) { - - this.bossExecutor = bossExecutor; - - bosses = new Boss[bossCount]; - for (int i = 0; i < bosses.length; i ++) { - bosses[i] = new Boss(); - } - + NioClientSocketPipelineSink(WorkerPool workerPool) { this.workerPool = workerPool; } @@ -83,25 +56,25 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case CONNECTED: if (value != null) { connect(channel, future, (SocketAddress) value); } else { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case INTEREST_OPS: - channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); + channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue()); break; } } else if (e instanceof MessageEvent) { @@ -109,7 +82,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBufferQueue.offer(event); assert offered; - channel.worker.writeFromUserCode(channel); + channel.getWorker().writeFromUserCode(channel); } } @@ -132,307 +105,31 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { final NioClientSocketChannel channel, final ChannelFuture cf, SocketAddress remoteAddress) { try { - if (channel.channel.connect(remoteAddress)) { - channel.worker.register(channel, cf); - } else { - channel.getCloseFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) - throws Exception { - if (!cf.isDone()) { - cf.setFailure(new ClosedChannelException()); - } + channel.channel.connect(remoteAddress); + + channel.getCloseFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) + throws Exception { + if (!cf.isDone()) { + cf.setFailure(new ClosedChannelException()); } - }); - cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - channel.connectFuture = cf; - nextBoss().register(channel); - } + } + }); + cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + channel.connectFuture = cf; + nextWorker().registerWithWorker(channel, cf); + //nextBoss().register(channel); + } catch (Throwable t) { cf.setFailure(t); fireExceptionCaught(channel, t); - channel.worker.close(channel, succeededFuture(channel)); + channel.getWorker().close(channel, succeededFuture(channel)); } } NioWorker nextWorker() { return workerPool.nextWorker(); } - - Boss nextBoss() { - return bosses[Math.abs( - bossIndex.getAndIncrement() % bosses.length)]; - } - - private final class Boss implements Runnable { - - volatile Selector selector; - private boolean started; - private final AtomicBoolean wakenUp = new AtomicBoolean(); - private final Object startStopLock = new Object(); - private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); - - Boss() { - } - - void register(NioClientSocketChannel channel) { - Runnable registerTask = new RegisterTask(this, channel); - Selector selector; - - synchronized (startStopLock) { - if (!started) { - // Open a selector if this worker didn't start yet. - try { - this.selector = selector = Selector.open(); - } catch (Throwable t) { - throw new ChannelException( - "Failed to create a selector.", t); - } - - // Start the worker thread with the new Selector. - boolean success = false; - try { - DeadLockProofWorker.start(bossExecutor, this); - success = true; - } finally { - if (!success) { - // Release the Selector if the execution fails. - try { - selector.close(); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a selector.", t); - } - } - this.selector = selector = null; - // The method will return to the caller at this point. - } - } - } else { - // Use the existing selector if this worker has been started. - selector = this.selector; - } - - assert selector != null && selector.isOpen(); - - started = true; - boolean offered = registerTaskQueue.offer(registerTask); - assert offered; - } - - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } - - @Override - public void run() { - boolean shutdown = false; - Selector selector = this.selector; - long lastConnectTimeoutCheckTimeNanos = System.nanoTime(); - for (;;) { - wakenUp.set(false); - - try { - int selectedKeyCount = selector.select(10); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { - selector.wakeup(); - } - - processRegisterTaskQueue(); - - if (selectedKeyCount > 0) { - processSelectedKeys(selector.selectedKeys()); - } - - // Handle connection timeout every 10 milliseconds approximately. - long currentTimeNanos = System.nanoTime(); - if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) { - lastConnectTimeoutCheckTimeNanos = currentTimeNanos; - processConnectTimeout(selector.keys(), currentTimeNanos); - } - - // Exit the loop when there's nothing to handle. - // The shutdown flag is used to delay the shutdown of this - // loop to avoid excessive Selector creation when - // connection attempts are made in a one-by-one manner - // instead of concurrent manner. - if (selector.keys().isEmpty()) { - if (shutdown || - bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) { - - synchronized (startStopLock) { - if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { - started = false; - try { - selector.close(); - } catch (IOException e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a selector.", e); - } - - } finally { - this.selector = null; - } - break; - } else { - shutdown = false; - } - } - } else { - // Give one more second. - shutdown = true; - } - } else { - shutdown = false; - } - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "Unexpected exception in the selector loop.", t); - } - - - // Prevent possible consecutive immediate failures. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - } - } - - private void processRegisterTaskQueue() { - for (;;) { - final Runnable task = registerTaskQueue.poll(); - if (task == null) { - break; - } - - task.run(); - } - } - - private void processSelectedKeys(Set selectedKeys) { - for (Iterator i = selectedKeys.iterator(); i.hasNext();) { - SelectionKey k = i.next(); - i.remove(); - - if (!k.isValid()) { - close(k); - continue; - } - - if (k.isConnectable()) { - connect(k); - } - } - } - - private void processConnectTimeout(Set keys, long currentTimeNanos) { - ConnectException cause = null; - for (SelectionKey k: keys) { - if (!k.isValid()) { - // Comment the close call again as it gave us major problems with ClosedChannelExceptions. - // - // See: - // * https://github.com/netty/netty/issues/142 - // * https://github.com/netty/netty/issues/138 - // - //close(k); - continue; - } - - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); - if (ch.connectDeadlineNanos > 0 && - currentTimeNanos >= ch.connectDeadlineNanos) { - - if (cause == null) { - cause = new ConnectException("connection timed out"); - } - - ch.connectFuture.setFailure(cause); - fireExceptionCaught(ch, cause); - ch.worker.close(ch, succeededFuture(ch)); - } - } - } - - private void connect(SelectionKey k) { - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); - try { - if (ch.channel.finishConnect()) { - k.cancel(); - ch.worker.register(ch, ch.connectFuture); - } - } catch (Throwable t) { - ch.connectFuture.setFailure(t); - fireExceptionCaught(ch, t); - k.cancel(); // Some JDK implementations run into an infinite loop without this. - ch.worker.close(ch, succeededFuture(ch)); - } - } - - private void close(SelectionKey k) { - NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); - ch.worker.close(ch, succeededFuture(ch)); - } - } - - private static final class RegisterTask implements Runnable { - private final Boss boss; - private final NioClientSocketChannel channel; - - RegisterTask(Boss boss, NioClientSocketChannel channel) { - this.boss = boss; - this.channel = channel; - } - - @Override - public void run() { - try { - channel.channel.register( - boss.selector, SelectionKey.OP_CONNECT, channel); - } catch (ClosedChannelException e) { - channel.worker.close(channel, succeededFuture(channel)); - } - - int connectTimeout = channel.getConfig().getConnectTimeoutMillis(); - if (connectTimeout > 0) { - channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L; - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java index 32fc568a42..78fcc34388 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -70,32 +70,32 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (InetSocketAddress) value); } else { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case CONNECTED: if (value != null) { connect(channel, future, (InetSocketAddress) value); } else { - NioDatagramWorker.disconnect(channel, future); + channel.getWorker().disconnect(channel, future); } break; case INTEREST_OPS: - channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); + channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue()); break; } } else if (e instanceof MessageEvent) { final MessageEvent event = (MessageEvent) e; final boolean offered = channel.writeBufferQueue.offer(event); assert offered; - channel.worker.writeFromUserCode(channel); + channel.getWorker().writeFromUserCode(channel); } } @@ -133,7 +133,7 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink { future.setSuccess(); fireChannelBound(channel, address); - channel.worker.register(channel, null); + channel.getWorker().registerWithWorker(channel, null); started = true; } catch (final Throwable t) { future.setFailure(t); @@ -171,16 +171,15 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink { fireChannelConnected(channel, channel.getRemoteAddress()); if (!bound) { - channel.worker.register(channel, future); + channel.getWorker().registerWithWorker(channel, future); } - workerStarted = true; } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); } finally { if (connected && !workerStarted) { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java index 31f9c44f84..93594bff80 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java @@ -109,8 +109,7 @@ public class NioDatagramWorker extends AbstractNioWorker { @Override protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { - final Thread workerThread = thread; - if (workerThread == null || Thread.currentThread() != workerThread) { + if (!isIoThread()) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { // "add" the channels writeTask to the writeTaskQueue. boolean offered = writeTaskQueue.offer(channel.writeTask); @@ -130,9 +129,9 @@ public class NioDatagramWorker extends AbstractNioWorker { } - static void disconnect(NioDatagramChannel channel, ChannelFuture future) { + void disconnect(NioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); - boolean iothread = isIoThread(channel); + boolean iothread = isIoThread(); try { channel.getDatagramChannel().disconnect(); future.setSuccess(); @@ -155,57 +154,33 @@ public class NioDatagramWorker extends AbstractNioWorker { @Override - protected Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future) { - return new ChannelRegistionTask((NioDatagramChannel) channel, future); - } - - /** - * RegisterTask is a task responsible for registering a channel with a - * selector. - */ - private final class ChannelRegistionTask implements Runnable { - private final NioDatagramChannel channel; - - private final ChannelFuture future; - - ChannelRegistionTask(final NioDatagramChannel channel, - final ChannelFuture future) { - this.channel = channel; - this.future = future; + protected void registerTask(AbstractNioChannel channel, ChannelFuture future) { + final SocketAddress localAddress = channel.getLocalAddress(); + if (localAddress == null) { + if (future != null) { + future.setFailure(new ClosedChannelException()); + } + close(channel, succeededFuture(channel)); + return; } - /** - * This runnable's task. Does the actual registering by calling the - * underlying DatagramChannels peer DatagramSocket register method. - */ - @Override - public void run() { - final SocketAddress localAddress = channel.getLocalAddress(); - if (localAddress == null) { - if (future != null) { - future.setFailure(new ClosedChannelException()); - } - close(channel, succeededFuture(channel)); - return; + try { + synchronized (channel.interestOpsLock) { + ((NioDatagramChannel) channel).getDatagramChannel().register( + selector, channel.getRawInterestOps(), channel); } - - try { - synchronized (channel.interestOpsLock) { - channel.getDatagramChannel().register( - selector, channel.getRawInterestOps(), channel); - } - if (future != null) { - future.setSuccess(); - } - } catch (final ClosedChannelException e) { - if (future != null) { - future.setFailure(e); - } - close(channel, succeededFuture(channel)); - throw new ChannelException( - "Failed to register a socket to the selector.", e); + if (future != null) { + future.setSuccess(); } + } catch (final ClosedChannelException e) { + if (future != null) { + future.setFailure(e); + } + close(channel, succeededFuture(channel)); + throw new ChannelException( + "Failed to register a socket to the selector.", e); } } + } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 4c96469ac9..9ede4ef802 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -35,7 +35,7 @@ import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; final class NioServerSocketChannel extends AbstractServerChannel - implements io.netty.channel.socket.ServerSocketChannel { + implements io.netty.channel.socket.ServerSocketChannel, NioChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class); @@ -43,12 +43,15 @@ final class NioServerSocketChannel extends AbstractServerChannel final ServerSocketChannel socket; final Lock shutdownLock = new ReentrantLock(); volatile Selector selector; + final NioWorker worker; + + private final ServerSocketChannelConfig config; static NioServerSocketChannel create(ChannelFactory factory, - ChannelPipeline pipeline, ChannelSink sink) { + ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) { NioServerSocketChannel instance = - new NioServerSocketChannel(factory, pipeline, sink); + new NioServerSocketChannel(factory, pipeline, sink, worker); fireChannelOpen(instance); return instance; } @@ -56,10 +59,10 @@ final class NioServerSocketChannel extends AbstractServerChannel private NioServerSocketChannel( ChannelFactory factory, ChannelPipeline pipeline, - ChannelSink sink) { + ChannelSink sink, NioWorker worker) { super(factory, pipeline, sink); - + this.worker = worker; try { socket = ServerSocketChannel.open(); } catch (IOException e) { @@ -110,4 +113,9 @@ final class NioServerSocketChannel extends AbstractServerChannel protected boolean setClosed() { return super.setClosed(); } + + @Override + public NioWorker getWorker() { + return worker; + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java index 4ba4a8b00e..e1a88875fd 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannelFactory.java @@ -22,13 +22,11 @@ import java.util.concurrent.RejectedExecutionException; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannelFactory; import io.netty.channel.socket.Worker; import io.netty.util.ExternalResourceReleasable; -import io.netty.util.internal.ExecutorUtil; /** * A {@link ServerSocketChannelFactory} which creates a server-side NIO-based @@ -85,18 +83,17 @@ import io.netty.util.internal.ExecutorUtil; */ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { - final Executor bossExecutor; private final WorkerPool workerPool; - private final ChannelSink sink; + private final NioServerSocketPipelineSink sink; /** * Create a new {@link NioServerSocketChannelFactory} using - * {@link Executors#newCachedThreadPool()} for the boss and worker. + * {@link Executors#newCachedThreadPool()} for the worker. * * See {@link #NioServerSocketChannelFactory(Executor, Executor)} */ public NioServerSocketChannelFactory() { - this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); + this(Executors.newCachedThreadPool()); } @@ -106,50 +103,38 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory * the number of available processors in the machine. The number of * available processors is obtained by {@link Runtime#availableProcessors()}. * - * @param bossExecutor - * the {@link Executor} which will execute the boss threads + * @param workerExecutor * the {@link Executor} which will execute the I/O worker threads */ - public NioServerSocketChannelFactory( - Executor bossExecutor, Executor workerExecutor) { - this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); + public NioServerSocketChannelFactory(Executor workerExecutor) { + this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS); } /** * Creates a new instance. * - * @param bossExecutor - * the {@link Executor} which will execute the boss threads * @param workerExecutor * the {@link Executor} which will execute the I/O worker threads * @param workerCount * the maximum number of I/O worker threads */ - public NioServerSocketChannelFactory( - Executor bossExecutor, Executor workerExecutor, + public NioServerSocketChannelFactory(Executor workerExecutor, int workerCount) { - this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount, true)); + this(new NioWorkerPool(workerExecutor, workerCount, true)); } /** * Creates a new instance. * - * @param bossExecutor - * the {@link Executor} which will execute the boss threads * @param workerPool * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads */ - public NioServerSocketChannelFactory( - Executor bossExecutor, WorkerPool workerPool) { - if (bossExecutor == null) { - throw new NullPointerException("bossExecutor"); - } + public NioServerSocketChannelFactory(WorkerPool workerPool) { if (workerPool == null) { throw new NullPointerException("workerPool"); } - this.bossExecutor = bossExecutor; this.workerPool = workerPool; sink = new NioServerSocketPipelineSink(workerPool); } @@ -157,12 +142,11 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory @Override public ServerSocketChannel newChannel(ChannelPipeline pipeline) { - return NioServerSocketChannel.create(this, pipeline, sink); + return NioServerSocketChannel.create(this, pipeline, sink, sink.nextWorker()); } @Override public void releaseExternalResources() { - ExecutorUtil.terminate(bossExecutor); if (workerPool instanceof ExternalResourceReleasable) { ((ExternalResourceReleasable) workerPool).releaseExternalResources(); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java index dd847310c5..82bb871099 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -17,16 +17,8 @@ package io.netty.channel.socket.nio; import static io.netty.channel.Channels.*; -import java.io.IOException; import java.net.SocketAddress; -import java.net.SocketTimeoutException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ClosedSelectorException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.concurrent.Executor; + import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; @@ -37,7 +29,6 @@ import io.netty.channel.ChannelStateEvent; import io.netty.channel.MessageEvent; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.DeadLockProofWorker; class NioServerSocketPipelineSink extends AbstractNioChannelSink { @@ -76,14 +67,14 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - close(channel, future); + channel.worker.close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { - close(channel, future); + channel.worker.close(channel, future); } break; } @@ -100,17 +91,17 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case BOUND: case CONNECTED: if (value == null) { - channel.worker.close(channel, future); + channel.getWorker().close(channel, future); } break; case INTEREST_OPS: - channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); + channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue()); break; } } else if (e instanceof MessageEvent) { @@ -118,7 +109,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBufferQueue.offer(event); assert offered; - channel.worker.writeFromUserCode(channel); + channel.getWorker().writeFromUserCode(channel); } } @@ -127,7 +118,6 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { SocketAddress localAddress) { boolean bound = false; - boolean bossStarted = false; try { channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); bound = true; @@ -135,157 +125,21 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress()); - Executor bossExecutor = - ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; - DeadLockProofWorker.start(bossExecutor, new Boss(channel)); - bossStarted = true; + nextWorker().registerWithWorker(channel, future); + } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); } finally { - if (!bossStarted && bound) { - close(channel, future); + if (!bound) { + channel.worker.close(channel, future); } } } - private void close(NioServerSocketChannel channel, ChannelFuture future) { - boolean bound = channel.isBound(); - try { - if (channel.socket.isOpen()) { - channel.socket.close(); - Selector selector = channel.selector; - if (selector != null) { - selector.wakeup(); - } - } - - // Make sure the boss thread is not running so that that the future - // is notified after a new connection cannot be accepted anymore. - // See NETTY-256 for more information. - channel.shutdownLock.lock(); - try { - if (channel.setClosed()) { - future.setSuccess(); - if (bound) { - fireChannelUnbound(channel); - } - fireChannelClosed(channel); - } else { - future.setSuccess(); - } - } finally { - channel.shutdownLock.unlock(); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } NioWorker nextWorker() { return workerPool.nextWorker(); } - private final class Boss implements Runnable { - private final Selector selector; - private final NioServerSocketChannel channel; - - Boss(NioServerSocketChannel channel) throws IOException { - this.channel = channel; - - selector = Selector.open(); - - boolean registered = false; - try { - channel.socket.register(selector, SelectionKey.OP_ACCEPT); - registered = true; - } finally { - if (!registered) { - closeSelector(); - } - } - - channel.selector = selector; - } - - @Override - public void run() { - final Thread currentThread = Thread.currentThread(); - - channel.shutdownLock.lock(); - try { - for (;;) { - try { - if (selector.select(1000) > 0) { - selector.selectedKeys().clear(); - } - - SocketChannel acceptedSocket = channel.socket.accept(); - if (acceptedSocket != null) { - registerAcceptedChannel(acceptedSocket, currentThread); - } - } catch (SocketTimeoutException e) { - // Thrown every second to get ClosedChannelException - // raised. - } catch (CancelledKeyException e) { - // Raised by accept() when the server socket was closed. - } catch (ClosedSelectorException e) { - // Raised by accept() when the server socket was closed. - } catch (ClosedChannelException e) { - // Closed as requested. - break; - } catch (Throwable e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to accept a connection.", e); - } - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - // Ignore - } - } - } - } finally { - channel.shutdownLock.unlock(); - closeSelector(); - } - } - - private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { - try { - ChannelPipeline pipeline = - channel.getConfig().getPipelineFactory().getPipeline(); - NioWorker worker = nextWorker(); - worker.register(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel, - NioServerSocketPipelineSink.this, acceptedSocket, worker, currentThread), null); - } catch (Exception e) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to initialize an accepted socket.", e); - } - try { - acceptedSocket.close(); - } catch (IOException e2) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to close a partially accepted socket.", - e2); - } - } - } - } - - private void closeSelector() { - channel.selector = null; - try { - selector.close(); - } catch (Exception e) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to close a selector.", e); - } - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 9cce753d74..1cf44f987c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -23,7 +23,7 @@ import io.netty.channel.ChannelSink; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; -public class NioSocketChannel extends AbstractNioChannel +public abstract class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel { private static final int ST_OPEN = 0; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java index b68043df7e..861e32e6f4 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioWorker.java @@ -108,16 +108,13 @@ public class NioWorker extends AbstractNioWorker { @Override protected boolean scheduleWriteIfNecessary(final AbstractNioChannel channel) { - final Thread currentThread = Thread.currentThread(); - final Thread workerThread = thread; - if (currentThread != workerThread) { + if (!isIoThread()) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { boolean offered = writeTaskQueue.offer(channel.writeTask); assert offered; } - if (!(channel instanceof NioAcceptedSocketChannel) || - ((NioAcceptedSocketChannel) channel).bossThread != currentThread) { + if (!(channel instanceof NioAcceptedSocketChannel)) { final Selector workerSelector = selector; if (workerSelector != null) { if (wakenUp.compareAndSet(false, true)) { @@ -143,66 +140,52 @@ public class NioWorker extends AbstractNioWorker { } @Override - protected Runnable createRegisterTask(AbstractNioChannel channel, ChannelFuture future) { + protected void registerTask(AbstractNioChannel channel, ChannelFuture future) { boolean server = !(channel instanceof NioClientSocketChannel); - return new RegisterTask((NioSocketChannel) channel, future, server); - } - - private final class RegisterTask implements Runnable { - private final NioSocketChannel channel; - private final ChannelFuture future; - private final boolean server; + SocketAddress localAddress = channel.getLocalAddress(); + SocketAddress remoteAddress = channel.getRemoteAddress(); - RegisterTask( - NioSocketChannel channel, ChannelFuture future, boolean server) { - - this.channel = channel; - this.future = future; - this.server = server; + if (localAddress == null || remoteAddress == null) { + if (future != null) { + future.setFailure(new ClosedChannelException()); + } + close(channel, succeededFuture(channel)); + return; } - @Override - public void run() { - SocketAddress localAddress = channel.getLocalAddress(); - SocketAddress remoteAddress = channel.getRemoteAddress(); - - if (localAddress == null || remoteAddress == null) { - if (future != null) { - future.setFailure(new ClosedChannelException()); - } - close(channel, succeededFuture(channel)); - return; + try { + if (server) { + channel.channel.configureBlocking(false); } - try { - if (server) { - channel.channel.configureBlocking(false); - } - + boolean registered = channel.channel.isRegistered(); + if (!registered) { synchronized (channel.interestOpsLock) { channel.channel.register( selector, channel.getRawInterestOps(), channel); } - if (future != null) { - channel.setConnected(); - future.setSuccess(); - } - } catch (IOException e) { - if (future != null) { - future.setFailure(e); - } - close(channel, succeededFuture(channel)); - if (!(e instanceof ClosedChannelException)) { - throw new ChannelException( - "Failed to register a socket to the selector.", e); - } + } - - if (server || !((NioClientSocketChannel) channel).boundManually) { - fireChannelBound(channel, localAddress); + if (future != null) { + ((NioSocketChannel) channel).setConnected(); + future.setSuccess(); + } + + } catch (IOException e) { + if (future != null) { + future.setFailure(e); + } + close(channel, succeededFuture(channel)); + if (!(e instanceof ClosedChannelException)) { + throw new ChannelException( + "Failed to register a socket to the selector.", e); } - fireChannelConnected(channel, remoteAddress); } + + if (server || !((NioClientSocketChannel) channel).boundManually) { + fireChannelBound(channel, localAddress); + } + fireChannelConnected(channel, remoteAddress); } }