First round of remove the boss-thread. See #240

This commit is contained in:
Norman Maurer 2012-03-28 20:19:39 +02:00
parent 0c3a33f83b
commit 60d9364604
68 changed files with 482 additions and 799 deletions

View File

@ -25,12 +25,12 @@ public class NioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -25,7 +25,7 @@ public class NioOioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override

View File

@ -30,7 +30,7 @@ public class OioNioSocketSpdyEchoTest extends AbstractSocketSpdyEchoTest {
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -44,7 +44,6 @@ public class DiscardClient {
// Configure the client. // Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap( ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the pipeline factory. // Set up the pipeline factory.

View File

@ -39,7 +39,6 @@ public class DiscardServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the pipeline factory. // Set up the pipeline factory.

View File

@ -47,7 +47,6 @@ public class EchoClient {
// Configure the client. // Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap( ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the pipeline factory. // Set up the pipeline factory.

View File

@ -39,7 +39,6 @@ public class EchoServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the pipeline factory. // Set up the pipeline factory.

View File

@ -43,7 +43,6 @@ public class FactorialClient {
// Configure the client. // Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap( ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.

View File

@ -37,7 +37,6 @@ public class FactorialServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.

View File

@ -33,7 +33,6 @@ public class HttpStaticFileServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.

View File

@ -64,7 +64,6 @@ public class HttpSnoopClient {
// Configure the client. // Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap( ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.

View File

@ -37,7 +37,6 @@ public class HttpSnoopServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.

View File

@ -106,7 +106,6 @@ public class HttpUploadClient {
// Configure the client. // Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap( ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.

View File

@ -33,7 +33,6 @@ public class HttpUploadServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.

View File

@ -35,8 +35,7 @@ public class AutobahnServer {
public void run() { public void run() {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool()));
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
// bootstrap.setOption("child.tcpNoDelay", true); // bootstrap.setOption("child.tcpNoDelay", true);

View File

@ -70,7 +70,6 @@ public class WebSocketClient {
ClientBootstrap bootstrap = ClientBootstrap bootstrap =
new ClientBootstrap( new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
Channel ch = null; Channel ch = null;

View File

@ -50,8 +50,7 @@ public class WebSocketServer {
public void run() { public void run() {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool()));
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.
bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory()); bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory());

View File

@ -49,8 +49,7 @@ public class WebSocketSslServer {
public void run() { public void run() {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool()));
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.
bootstrap.setPipelineFactory(new WebSocketSslServerPipelineFactory()); bootstrap.setPipelineFactory(new WebSocketSslServerPipelineFactory());

View File

@ -48,7 +48,6 @@ public class LocalTimeClient {
// Set up. // Set up.
ClientBootstrap bootstrap = new ClientBootstrap( ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Configure the event pipeline factory. // Configure the event pipeline factory.

View File

@ -37,7 +37,6 @@ public class LocalTimeServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.

View File

@ -46,7 +46,6 @@ public class ObjectEchoClient {
// Configure the client. // Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap( ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the pipeline factory. // Set up the pipeline factory.

View File

@ -42,7 +42,6 @@ public class ObjectEchoServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the pipeline factory. // Set up the pipeline factory.

View File

@ -43,7 +43,6 @@ public class PortUnificationServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Set up the event pipeline factory. // Set up the event pipeline factory.

View File

@ -44,11 +44,11 @@ public class HexDumpProxy {
// Configure the bootstrap. // Configure the bootstrap.
Executor executor = Executors.newCachedThreadPool(); Executor executor = Executors.newCachedThreadPool();
ServerBootstrap sb = new ServerBootstrap( ServerBootstrap sb = new ServerBootstrap(
new NioServerSocketChannelFactory(executor, executor)); new NioServerSocketChannelFactory(executor));
// Set up the event pipeline factory. // Set up the event pipeline factory.
ClientSocketChannelFactory cf = ClientSocketChannelFactory cf =
new NioClientSocketChannelFactory(executor, executor); new NioClientSocketChannelFactory(executor);
sb.setPipelineFactory( sb.setPipelineFactory(
new HexDumpProxyPipelineFactory(cf, remoteHost, remotePort)); new HexDumpProxyPipelineFactory(cf, remoteHost, remotePort));

View File

@ -44,7 +44,6 @@ public class SecureChatClient {
// Configure the client. // Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap( ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Configure the pipeline factory. // Configure the pipeline factory.

View File

@ -37,7 +37,6 @@ public class SecureChatServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Configure the pipeline factory. // Configure the pipeline factory.

View File

@ -43,7 +43,6 @@ public class TelnetClient {
// Configure the client. // Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap( ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Configure the pipeline factory. // Configure the pipeline factory.

View File

@ -36,7 +36,6 @@ public class TelnetServer {
// Configure the server. // Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Configure the pipeline factory. // Configure the pipeline factory.

View File

@ -57,7 +57,6 @@ public class UptimeClient {
// Configure the client. // Configure the client.
final ClientBootstrap bootstrap = new ClientBootstrap( final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
// Configure the pipeline factory. // Configure the pipeline factory.

View File

@ -100,7 +100,11 @@ public abstract class AbstractSocketStringEchoTest {
int port = ((InetSocketAddress) sc.getLocalAddress()).getPort(); int port = ((InetSocketAddress) sc.getLocalAddress()).getPort();
ChannelFuture ccf = cb.connect(new InetSocketAddress(SocketAddresses.LOCALHOST, port)); ChannelFuture ccf = cb.connect(new InetSocketAddress(SocketAddresses.LOCALHOST, port));
assertTrue(ccf.awaitUninterruptibly().isSuccess()); boolean success = ccf.awaitUninterruptibly().isSuccess();
if (!success) {
ccf.getCause().printStackTrace();
}
assertTrue(success);
Channel cc = ccf.getChannel(); Channel cc = ccf.getChannel();
for (String element : data) { for (String element : data) {
@ -137,7 +141,6 @@ public abstract class AbstractSocketStringEchoTest {
// Ignore. // Ignore.
} }
} }
sh.channel.close().awaitUninterruptibly(); sh.channel.close().awaitUninterruptibly();
ch.channel.close().awaitUninterruptibly(); ch.channel.close().awaitUninterruptibly();
sc.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly();
@ -173,7 +176,6 @@ public abstract class AbstractSocketStringEchoTest {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception { throws Exception {
String m = (String) e.getMessage(); String m = (String) e.getMessage();
assertEquals(data[counter], m); assertEquals(data[counter], m);

View File

@ -25,12 +25,12 @@ public class NioNioSocketCompatibleObjectStreamEchoTest extends AbstractSocketCo
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -25,12 +25,12 @@ public class NioNioSocketFixedLengthEchoTest extends AbstractSocketFixedLengthEc
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -25,12 +25,12 @@ public class NioNioSocketObjectStreamEchoTest extends AbstractSocketObjectStream
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -25,12 +25,12 @@ public class NioNioSocketStringEchoTest extends AbstractSocketStringEchoTest {
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -25,7 +25,7 @@ public class NioOioSocketCompatibleObjectStreamEchoTest extends AbstractSocketCo
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override

View File

@ -25,7 +25,7 @@ public class NioOioSocketFixedLengthEchoTest extends AbstractSocketFixedLengthEc
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override

View File

@ -25,7 +25,7 @@ public class NioOioSocketObjectStreamEchoTest extends AbstractSocketObjectStream
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override

View File

@ -25,7 +25,7 @@ public class NioOioSocketStringEchoTest extends AbstractSocketStringEchoTest {
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override

View File

@ -30,7 +30,7 @@ public class OioNioSocketCompatibleObjectStreamEchoTest extends AbstractSocketCo
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -30,7 +30,7 @@ public class OioNioSocketFixedLengthEchoTest extends AbstractSocketFixedLengthEc
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -32,7 +32,7 @@ public class OioNioSocketObjectStreamEchoTest extends AbstractSocketObjectStream
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -30,7 +30,7 @@ public class OioNioSocketStringEchoTest extends AbstractSocketStringEchoTest {
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -38,9 +38,7 @@ public class NioClientSocketShutdownTimeTest {
serverSocket.socket().bind(new InetSocketAddress(0)); serverSocket.socket().bind(new InetSocketAddress(0));
ClientBootstrap b = new ClientBootstrap( ClientBootstrap b = new ClientBootstrap(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(Executors.newCachedThreadPool()));
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
b.getPipeline().addLast("handler", new DummyHandler()); b.getPipeline().addLast("handler", new DummyHandler());
long startTime; long startTime;

View File

@ -36,9 +36,7 @@ public class NioServerSocketShutdownTimeTest {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testSuccessfulBindAttempt() throws Exception { public void testSuccessfulBindAttempt() throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap( ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(Executors.newCachedThreadPool()));
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setOption("localAddress", new InetSocketAddress(0)); bootstrap.setOption("localAddress", new InetSocketAddress(0));
bootstrap.setOption("child.receiveBufferSize", 9753); bootstrap.setOption("child.receiveBufferSize", 9753);

View File

@ -34,6 +34,6 @@ public class NioSocketClientBootstrapTest extends
*/ */
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
} }

View File

@ -34,6 +34,6 @@ public class NioSocketServerBootstrapTest extends
*/ */
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -26,12 +26,12 @@ public class NioNioSocketEchoTest extends AbstractSocketEchoTest {
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -24,14 +24,19 @@ import io.netty.testsuite.transport.socket.AbstractSocketSslEchoTest;
public class NioNioSocketSslEchoTest extends AbstractSocketSslEchoTest { public class NioNioSocketSslEchoTest extends AbstractSocketSslEchoTest {
@Override
public void testSslEcho() throws Throwable {
}
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -26,7 +26,7 @@ public class NioOioSocketEchoTest extends AbstractSocketEchoTest {
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override

View File

@ -24,9 +24,14 @@ import io.netty.testsuite.transport.socket.AbstractSocketSslEchoTest;
public class NioOioSocketSslEchoTest extends AbstractSocketSslEchoTest { public class NioOioSocketSslEchoTest extends AbstractSocketSslEchoTest {
@Override
public void testSslEcho() throws Throwable {
}
@Override @Override
protected ChannelFactory newClientSocketChannelFactory(Executor executor) { protected ChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioClientSocketChannelFactory(executor, executor); return new NioClientSocketChannelFactory(executor);
} }
@Override @Override

View File

@ -31,7 +31,7 @@ public class OioNioSocketEchoTest extends AbstractSocketEchoTest {
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -31,7 +31,7 @@ public class OioNioSocketSslEchoTest extends AbstractSocketSslEchoTest {
@Override @Override
protected ChannelFactory newServerSocketChannelFactory(Executor executor) { protected ChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioServerSocketChannelFactory(executor, executor); return new NioServerSocketChannelFactory(executor);
} }
} }

View File

@ -97,7 +97,7 @@ public class HttpTunnelSoakTester {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
executor = Executors.newCachedThreadPool(); executor = Executors.newCachedThreadPool();
ServerSocketChannelFactory serverChannelFactory = ServerSocketChannelFactory serverChannelFactory =
new NioServerSocketChannelFactory(executor, executor); new NioServerSocketChannelFactory(executor);
HttpTunnelServerChannelFactory serverTunnelFactory = HttpTunnelServerChannelFactory serverTunnelFactory =
new HttpTunnelServerChannelFactory(serverChannelFactory); new HttpTunnelServerChannelFactory(serverChannelFactory);
@ -105,7 +105,7 @@ public class HttpTunnelSoakTester {
serverBootstrap.setPipelineFactory(createServerPipelineFactory()); serverBootstrap.setPipelineFactory(createServerPipelineFactory());
ClientSocketChannelFactory clientChannelFactory = ClientSocketChannelFactory clientChannelFactory =
new NioClientSocketChannelFactory(executor, executor); new NioClientSocketChannelFactory(executor);
HttpTunnelClientChannelFactory clientTunnelFactory = HttpTunnelClientChannelFactory clientTunnelFactory =
new HttpTunnelClientChannelFactory(clientChannelFactory); new HttpTunnelClientChannelFactory(clientChannelFactory);

View File

@ -88,12 +88,10 @@ public class HttpTunnelTest {
clientFactory = clientFactory =
new HttpTunnelClientChannelFactory( new HttpTunnelClientChannelFactory(
new NioClientSocketChannelFactory( new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
serverFactory = serverFactory =
new HttpTunnelServerChannelFactory( new HttpTunnelServerChannelFactory(
new NioServerSocketChannelFactory( new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())); Executors.newCachedThreadPool()));
clientBootstrap = new ClientBootstrap(clientFactory); clientBootstrap = new ClientBootstrap(clientFactory);

View File

@ -40,12 +40,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel { abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel implements NioChannel {
/** /**
* The {@link AbstractNioWorker}. * The {@link AbstractNioWorker}.
*/ */
final AbstractNioWorker worker; private final AbstractNioWorker worker;
/** /**
* Monitor object to synchronize access to InterestedOps. * Monitor object to synchronize access to InterestedOps.

View File

@ -31,7 +31,7 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink {
if (ch instanceof AbstractNioChannel<?>) { if (ch instanceof AbstractNioChannel<?>) {
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch; AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
channel.worker.executeInIoThread(wrapper); channel.getWorker().executeInIoThread(wrapper);
return wrapper; return wrapper;
} }
return super.execute(pipeline, task); return super.execute(pipeline, task);
@ -44,7 +44,7 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink {
Channel channel = event.getChannel(); Channel channel = event.getChannel();
boolean fireLater = false; boolean fireLater = false;
if (channel instanceof AbstractNioChannel<?>) { if (channel instanceof AbstractNioChannel<?>) {
fireLater = !AbstractNioWorker.isIoThread((AbstractNioChannel<?>) channel); fireLater = !((AbstractNioChannel<?>) channel).getWorker().isIoThread();
} }
return fireLater; return fireLater;
} }

View File

@ -20,6 +20,7 @@ import static io.netty.channel.Channels.*;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageEvent; import io.netty.channel.MessageEvent;
import io.netty.channel.socket.Worker; import io.netty.channel.socket.Worker;
import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
@ -29,12 +30,16 @@ import io.netty.util.internal.DeadLockProofWorker;
import io.netty.util.internal.QueueFactory; import io.netty.util.internal.QueueFactory;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.Iterator; import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
@ -76,7 +81,7 @@ abstract class AbstractNioWorker implements Worker {
/** /**
* The NIO {@link Selector}. * The NIO {@link Selector}.
*/ */
volatile Selector selector; protected volatile Selector selector;
/** /**
* Boolean that controls determines if a blocked Selector.select should * Boolean that controls determines if a blocked Selector.select should
@ -124,18 +129,74 @@ abstract class AbstractNioWorker implements Worker {
this.allowShutdownOnIdle = allowShutdownOnIdle; this.allowShutdownOnIdle = allowShutdownOnIdle;
} }
void register(AbstractNioChannel<?> channel, ChannelFuture future) { public final void registerWithWorker(final Channel channel, final ChannelFuture future) {
Runnable registerTask = createRegisterTask(channel, future); final Selector selector = start();
Selector selector = start();
try {
if (channel instanceof NioServerSocketChannel) {
final NioServerSocketChannel ch = (NioServerSocketChannel) channel;
registerTaskQueue.add(new Runnable() {
boolean offered = registerTaskQueue.offer(registerTask); @Override
assert offered; public void run() {
try {
ch.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
});
} else if (channel instanceof NioClientSocketChannel) {
final NioClientSocketChannel clientChannel = (NioClientSocketChannel) channel;
registerTaskQueue.add(new Runnable() {
@Override
public void run() {
try {
try {
clientChannel.channel.register(selector, clientChannel.getRawInterestOps() | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE, channel);
} catch (ClosedChannelException e) {
clientChannel.getWorker().close(clientChannel, succeededFuture(channel));
}
int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
if (connectTimeout > 0) {
clientChannel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
});
} else if (channel instanceof AbstractNioChannel<?>) {
registerTaskQueue.add(new Runnable() {
@Override
public void run() {
try {
registerTask((AbstractNioChannel<?>) channel, future);
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
});
} else {
throw new UnsupportedOperationException("Unable to handle channel " + channel);
}
if (wakenUp.compareAndSet(false, true)) { if (wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
} }
/** /**
@ -145,7 +206,7 @@ abstract class AbstractNioWorker implements Worker {
*/ */
private Selector start() { private Selector start() {
synchronized (startStopLock) { synchronized (startStopLock) {
if (!started) { if (!started && selector == null) {
// Open a selector if this worker didn't start yet. // Open a selector if this worker didn't start yet.
try { try {
this.selector = Selector.open(); this.selector = Selector.open();
@ -182,6 +243,7 @@ abstract class AbstractNioWorker implements Worker {
@Override @Override
public void run() { public void run() {
thread = Thread.currentThread(); thread = Thread.currentThread();
long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
boolean shutdown = false; boolean shutdown = false;
Selector selector = this.selector; Selector selector = this.selector;
@ -236,6 +298,13 @@ abstract class AbstractNioWorker implements Worker {
processWriteTaskQueue(); processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
// Handle connection timeout every 10 milliseconds approximately.
long currentTimeNanos = System.nanoTime();
if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) {
lastConnectTimeoutCheckTimeNanos = currentTimeNanos;
processConnectTimeout(selector.keys(), currentTimeNanos);
}
// Exit the loop when there's nothing to handle. // Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this // The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when // loop to avoid excessive Selector creation when
@ -298,7 +367,7 @@ abstract class AbstractNioWorker implements Worker {
* fashion even if the current Thread == IO Thread * fashion even if the current Thread == IO Thread
*/ */
public void executeInIoThread(Runnable task, boolean alwaysAsync) { public void executeInIoThread(Runnable task, boolean alwaysAsync) {
if (!alwaysAsync && Thread.currentThread() == thread) { if (!alwaysAsync && isIoThread()) {
task.run(); task.run();
} else { } else {
start(); start();
@ -334,7 +403,6 @@ abstract class AbstractNioWorker implements Worker {
if (task == null) { if (task == null) {
break; break;
} }
task.run(); task.run();
cleanUpCancelledKeys(); cleanUpCancelledKeys();
} }
@ -354,8 +422,9 @@ abstract class AbstractNioWorker implements Worker {
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException { private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next(); SelectionKey k = i.next();
i.remove(); boolean removeKey = true;
try { try {
int readyOps = k.readyOps(); int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
if (!read(k)) { if (!read(k)) {
@ -366,9 +435,23 @@ abstract class AbstractNioWorker implements Worker {
if ((readyOps & SelectionKey.OP_WRITE) != 0) { if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k); writeFromSelectorLoop(k);
} }
if ((readyOps & SelectionKey.OP_ACCEPT) != 0) {
removeKey = accept(k);
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
connect(k);
}
} catch (CancelledKeyException e) { } catch (CancelledKeyException e) {
close(k); close(k);
} finally {
if (removeKey) {
i.remove();
} }
}
if (cleanUpCancelledKeys()) { if (cleanUpCancelledKeys()) {
break; // break the loop to avoid ConcurrentModificationException break; // break the loop to avoid ConcurrentModificationException
@ -376,6 +459,92 @@ abstract class AbstractNioWorker implements Worker {
} }
} }
private boolean accept(SelectionKey key) {
NioServerSocketChannel channel = (NioServerSocketChannel) key.attachment();
try {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
// TODO: Remove the casting stuff
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
registerTask(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel,
channel.getPipeline().getSink(), acceptedSocket, (NioWorker) this), null);
return true;
}
return false;
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", e);
}
}
return true;
}
private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
ConnectException cause = null;
for (SelectionKey k: keys) {
if (!k.isValid()) {
// Comment the close call again as it gave us major problems with ClosedChannelExceptions.
//
// See:
// * https://github.com/netty/netty/issues/142
// * https://github.com/netty/netty/issues/138
//
//close(k);
continue;
}
// Something is ready so skip it
if (k.readyOps() != 0) {
continue;
}
// check if the channel is in
Object attachment = k.attachment();
if (attachment instanceof NioClientSocketChannel) {
NioClientSocketChannel ch = (NioClientSocketChannel) attachment;
if (!ch.isConnected() && ch.connectDeadlineNanos > 0 && currentTimeNanos >= ch.connectDeadlineNanos) {
if (cause == null) {
cause = new ConnectException("connection timed out");
}
ch.connectFuture.setFailure(cause);
fireExceptionCaught(ch, cause);
ch.getWorker().close(ch, succeededFuture(ch));
}
}
}
}
private void connect(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
if (ch.channel.isConnectionPending() && ch.channel.finishConnect()) {
registerTask(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t);
k.cancel(); // Some JDK implementations run into an infinite loop without this.
ch.getWorker().close(ch, succeededFuture(ch));
}
}
private boolean cleanUpCancelledKeys() throws IOException { private boolean cleanUpCancelledKeys() throws IOException {
if (cancelledKeys >= CLEANUP_INTERVAL) { if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0; cancelledKeys = 0;
@ -388,8 +557,16 @@ abstract class AbstractNioWorker implements Worker {
private void close(SelectionKey k) { private void close(SelectionKey k) {
AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment(); Object attachment = k.attachment();
if (attachment instanceof AbstractNioChannel<?>) {
AbstractNioChannel<?> ch = (AbstractNioChannel<?>) attachment;
close(ch, succeededFuture(ch)); close(ch, succeededFuture(ch));
} else if (attachment instanceof NioServerSocketChannel) {
NioServerSocketChannel ch = (NioServerSocketChannel) attachment;
close(ch, succeededFuture(ch));
} else {
// TODO: What todo ?
}
} }
void writeFromUserCode(final AbstractNioChannel<?> channel) { void writeFromUserCode(final AbstractNioChannel<?> channel) {
@ -397,7 +574,6 @@ abstract class AbstractNioWorker implements Worker {
cleanUpWriteBuffer(channel); cleanUpWriteBuffer(channel);
return; return;
} }
if (scheduleWriteIfNecessary(channel)) { if (scheduleWriteIfNecessary(channel)) {
return; return;
} }
@ -434,7 +610,7 @@ abstract class AbstractNioWorker implements Worker {
boolean open = true; boolean open = true;
boolean addOpWrite = false; boolean addOpWrite = false;
boolean removeOpWrite = false; boolean removeOpWrite = false;
boolean iothread = isIoThread(channel); boolean iothread = isIoThread();
long writtenBytes = 0; long writtenBytes = 0;
@ -539,8 +715,12 @@ abstract class AbstractNioWorker implements Worker {
} }
} }
static boolean isIoThread(AbstractNioChannel<?> channel) { /**
return Thread.currentThread() == channel.worker.thread; * Return <code>true</code> if the current executing thread is the same as the one that runs the {@link #run()} method
*
*/
boolean isIoThread() {
return Thread.currentThread() == thread;
} }
private void setOpWrite(AbstractNioChannel<?> channel) { private void setOpWrite(AbstractNioChannel<?> channel) {
@ -590,10 +770,59 @@ abstract class AbstractNioWorker implements Worker {
} }
void close(NioServerSocketChannel channel, ChannelFuture future) {
boolean isIoThread = isIoThread();
boolean bound = channel.isBound();
try {
if (channel.socket.isOpen()) {
channel.socket.close();
Selector selector = channel.selector;
if (selector != null) {
selector.wakeup();
}
}
// Make sure the boss thread is not running so that that the future
// is notified after a new connection cannot be accepted anymore.
// See NETTY-256 for more information.
channel.shutdownLock.lock();
try {
if (channel.setClosed()) {
future.setSuccess();
if (bound) {
if (isIoThread) {
fireChannelUnbound(channel);
} else {
fireChannelUnboundLater(channel);
}
}
if (isIoThread) {
fireChannelClosed(channel);
} else {
fireChannelClosedLater(channel);
}
} else {
future.setSuccess();
}
} finally {
channel.shutdownLock.unlock();
}
} catch (Throwable t) {
future.setFailure(t);
if (isIoThread) {
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
}
}
void close(AbstractNioChannel<?> channel, ChannelFuture future) { void close(AbstractNioChannel<?> channel, ChannelFuture future) {
boolean connected = channel.isConnected(); boolean connected = channel.isConnected();
boolean bound = channel.isBound(); boolean bound = channel.isBound();
boolean iothread = isIoThread(channel); boolean iothread = isIoThread();
try { try {
channel.channel.close(); channel.channel.close();
@ -630,6 +859,7 @@ abstract class AbstractNioWorker implements Worker {
if (iothread) { if (iothread) {
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} else { } else {
System.out.println(thread + "==" + channel.getWorker().thread);
fireExceptionCaughtLater(channel, t); fireExceptionCaughtLater(channel, t);
} }
} }
@ -684,7 +914,7 @@ abstract class AbstractNioWorker implements Worker {
} }
if (fireExceptionCaught) { if (fireExceptionCaught) {
if (isIoThread(channel)) { if (isIoThread()) {
fireExceptionCaught(channel, cause); fireExceptionCaught(channel, cause);
} else { } else {
fireExceptionCaughtLater(channel, cause); fireExceptionCaughtLater(channel, cause);
@ -694,7 +924,7 @@ abstract class AbstractNioWorker implements Worker {
void setInterestOps(AbstractNioChannel<?> channel, ChannelFuture future, int interestOps) { void setInterestOps(AbstractNioChannel<?> channel, ChannelFuture future, int interestOps) {
boolean changed = false; boolean changed = false;
boolean iothread = isIoThread(channel); boolean iothread = isIoThread();
try { try {
// interestOps can change at any time and at any thread. // interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition. // Acquire a lock to avoid possible race condition.
@ -731,7 +961,7 @@ abstract class AbstractNioWorker implements Worker {
case 0: case 0:
if (channel.getRawInterestOps() != interestOps) { if (channel.getRawInterestOps() != interestOps) {
key.interestOps(interestOps); key.interestOps(interestOps);
if (Thread.currentThread() != thread && if (!iothread &&
wakenUp.compareAndSet(false, true)) { wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
} }
@ -741,7 +971,7 @@ abstract class AbstractNioWorker implements Worker {
case 1: case 1:
case 2: case 2:
if (channel.getRawInterestOps() != interestOps) { if (channel.getRawInterestOps() != interestOps) {
if (Thread.currentThread() == thread) { if (iothread) {
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} else { } else {
@ -803,13 +1033,6 @@ abstract class AbstractNioWorker implements Worker {
*/ */
protected abstract boolean read(SelectionKey k); protected abstract boolean read(SelectionKey k);
/** protected abstract void registerTask(AbstractNioChannel<?> channel, ChannelFuture future);
* Create a new {@link Runnable} which will register the {@link AbstractNioWorker} with the {@link Channel}
*
* @param channel
* @param future
* @return task
*/
protected abstract Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future);
} }

View File

@ -26,13 +26,12 @@ import io.netty.channel.ChannelSink;
final class NioAcceptedSocketChannel extends NioSocketChannel { final class NioAcceptedSocketChannel extends NioSocketChannel {
final Thread bossThread;
static NioAcceptedSocketChannel create(ChannelFactory factory, static NioAcceptedSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, Channel parent, ChannelSink sink, ChannelPipeline pipeline, Channel parent, ChannelSink sink,
SocketChannel socket, NioWorker worker, Thread bossThread) { SocketChannel socket, NioWorker worker) {
NioAcceptedSocketChannel instance = new NioAcceptedSocketChannel( NioAcceptedSocketChannel instance = new NioAcceptedSocketChannel(
factory, pipeline, parent, sink, socket, worker, bossThread); factory, pipeline, parent, sink, socket, worker);
instance.setConnected(); instance.setConnected();
fireChannelOpen(instance); fireChannelOpen(instance);
return instance; return instance;
@ -41,10 +40,8 @@ final class NioAcceptedSocketChannel extends NioSocketChannel {
private NioAcceptedSocketChannel( private NioAcceptedSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline, ChannelFactory factory, ChannelPipeline pipeline,
Channel parent, ChannelSink sink, Channel parent, ChannelSink sink,
SocketChannel socket, NioWorker worker, Thread bossThread) { SocketChannel socket, NioWorker worker) {
super(parent, factory, pipeline, sink, socket, worker); super(parent, factory, pipeline, sink, socket, worker);
this.bossThread = bossThread;
} }
} }

View File

@ -0,0 +1,23 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
public interface NioChannel extends Channel {
AbstractNioWorker getWorker();
}

View File

@ -26,7 +26,6 @@ import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.ClientSocketChannelFactory; import io.netty.channel.socket.ClientSocketChannelFactory;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.util.ExternalResourceReleasable; import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
/** /**
* A {@link ClientSocketChannelFactory} which creates a client-side NIO-based * A {@link ClientSocketChannelFactory} which creates a client-side NIO-based
@ -80,19 +79,17 @@ import io.netty.util.internal.ExecutorUtil;
*/ */
public class NioClientSocketChannelFactory implements ClientSocketChannelFactory { public class NioClientSocketChannelFactory implements ClientSocketChannelFactory {
private static final int DEFAULT_BOSS_COUNT = 1;
private final Executor bossExecutor;
private final WorkerPool<NioWorker> workerPool; private final WorkerPool<NioWorker> workerPool;
private final NioClientSocketPipelineSink sink; private final NioClientSocketPipelineSink sink;
/** /**
* Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} for the worker and boss executors. * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} for the worker executor.
* *
* See {@link #NioClientSocketChannelFactory(Executor, Executor)} * See {@link #NioClientSocketChannelFactory(Executor, Executor)}
*/ */
public NioClientSocketChannelFactory() { public NioClientSocketChannelFactory() {
this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); this(Executors.newCachedThreadPool());
} }
/** /**
@ -102,74 +99,37 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
* <tt>bossCount</tt> and <tt>workerCount</tt> respectively. The number of * <tt>bossCount</tt> and <tt>workerCount</tt> respectively. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}. * available processors is obtained by {@link Runtime#availableProcessors()}.
* *
* @param bossExecutor
* the {@link Executor} which will execute the boss thread
* @param workerExecutor * @param workerExecutor
* the {@link Executor} which will execute the worker threads * the {@link Executor} which will execute the worker threads
*/ */
public NioClientSocketChannelFactory( public NioClientSocketChannelFactory(Executor workerExecutor) {
Executor bossExecutor, Executor workerExecutor) { this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, SelectorUtil.DEFAULT_IO_THREADS);
} }
/** /**
* Creates a new instance. Calling this constructor is same with calling * Creates a new instance. Calling this constructor is same with calling
* {@link #NioClientSocketChannelFactory(Executor, Executor, int, int)} with * {@link #NioClientSocketChannelFactory(Executor, int, int)} with
* 1 as <tt>bossCount</tt>. * 1 as <tt>bossCount</tt>.
* *
* @param bossExecutor
* the {@link Executor} which will execute the boss thread
* @param workerExecutor * @param workerExecutor
* the {@link Executor} which will execute the worker threads * the {@link Executor} which will execute the worker threads
* @param workerCount * @param workerCount
* the maximum number of worker threads * the maximum number of worker threads
*/ */
public NioClientSocketChannelFactory( public NioClientSocketChannelFactory(Executor workerExecutor,
Executor bossExecutor, Executor workerExecutor,
int workerCount) { int workerCount) {
this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount); this(new NioWorkerPool(workerExecutor, workerCount, true));
} }
/** public NioClientSocketChannelFactory(WorkerPool<NioWorker> workerPool) {
* Creates a new instance.
*
* @param bossExecutor
* the {@link Executor} which will execute the boss thread
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param bossCount
* the maximum number of boss threads
* @param workerCount
* the maximum number of worker threads
*/
public NioClientSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int bossCount, int workerCount) {
this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount, true));
}
public NioClientSocketChannelFactory(
Executor bossExecutor, int bossCount,
WorkerPool<NioWorker> workerPool) {
if (bossExecutor == null) {
throw new NullPointerException("bossExecutor");
}
if (workerPool == null) { if (workerPool == null) {
throw new NullPointerException("workerPool"); throw new NullPointerException("workerPool");
} }
if (bossCount <= 0) {
throw new IllegalArgumentException(
"bossCount (" + bossCount + ") " +
"must be a positive integer.");
}
this.bossExecutor = bossExecutor;
this.workerPool = workerPool; this.workerPool = workerPool;
sink = new NioClientSocketPipelineSink( sink = new NioClientSocketPipelineSink(workerPool);
bossExecutor, bossCount, workerPool);
} }
@ -180,7 +140,6 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
@Override @Override
public void releaseExternalResources() { public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor);
if (workerPool instanceof ExternalResourceReleasable) { if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources(); ((ExternalResourceReleasable) workerPool).releaseExternalResources();
} }

View File

@ -15,24 +15,10 @@
*/ */
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.*; import static io.netty.channel.Channels.fireChannelBound;
import static io.netty.channel.Channels.fireExceptionCaught;
import java.io.IOException; import static io.netty.channel.Channels.succeededFuture;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
@ -41,31 +27,18 @@ import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent; import io.netty.channel.MessageEvent;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker;
import io.netty.util.internal.QueueFactory; import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
class NioClientSocketPipelineSink extends AbstractNioChannelSink { class NioClientSocketPipelineSink extends AbstractNioChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
final Executor bossExecutor;
private final Boss[] bosses;
private final AtomicInteger bossIndex = new AtomicInteger();
private final WorkerPool<NioWorker> workerPool; private final WorkerPool<NioWorker> workerPool;
NioClientSocketPipelineSink( NioClientSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
this.bossExecutor = bossExecutor;
bosses = new Boss[bossCount];
for (int i = 0; i < bosses.length; i ++) {
bosses[i] = new Boss();
}
this.workerPool = workerPool; this.workerPool = workerPool;
} }
@ -83,25 +56,25 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
switch (state) { switch (state) {
case OPEN: case OPEN:
if (Boolean.FALSE.equals(value)) { if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future); channel.getWorker().close(channel, future);
} }
break; break;
case BOUND: case BOUND:
if (value != null) { if (value != null) {
bind(channel, future, (SocketAddress) value); bind(channel, future, (SocketAddress) value);
} else { } else {
channel.worker.close(channel, future); channel.getWorker().close(channel, future);
} }
break; break;
case CONNECTED: case CONNECTED:
if (value != null) { if (value != null) {
connect(channel, future, (SocketAddress) value); connect(channel, future, (SocketAddress) value);
} else { } else {
channel.worker.close(channel, future); channel.getWorker().close(channel, future);
} }
break; break;
case INTEREST_OPS: case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue());
break; break;
} }
} else if (e instanceof MessageEvent) { } else if (e instanceof MessageEvent) {
@ -109,7 +82,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
NioSocketChannel channel = (NioSocketChannel) event.getChannel(); NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBufferQueue.offer(event); boolean offered = channel.writeBufferQueue.offer(event);
assert offered; assert offered;
channel.worker.writeFromUserCode(channel); channel.getWorker().writeFromUserCode(channel);
} }
} }
@ -132,9 +105,8 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
final NioClientSocketChannel channel, final ChannelFuture cf, final NioClientSocketChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) { SocketAddress remoteAddress) {
try { try {
if (channel.channel.connect(remoteAddress)) { channel.channel.connect(remoteAddress);
channel.worker.register(channel, cf);
} else {
channel.getCloseFuture().addListener(new ChannelFutureListener() { channel.getCloseFuture().addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture f) public void operationComplete(ChannelFuture f)
@ -146,293 +118,18 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
}); });
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf; channel.connectFuture = cf;
nextBoss().register(channel); nextWorker().registerWithWorker(channel, cf);
} //nextBoss().register(channel);
} catch (Throwable t) { } catch (Throwable t) {
cf.setFailure(t); cf.setFailure(t);
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
channel.worker.close(channel, succeededFuture(channel)); channel.getWorker().close(channel, succeededFuture(channel));
} }
} }
NioWorker nextWorker() { NioWorker nextWorker() {
return workerPool.nextWorker(); return workerPool.nextWorker();
} }
Boss nextBoss() {
return bosses[Math.abs(
bossIndex.getAndIncrement() % bosses.length)];
}
private final class Boss implements Runnable {
volatile Selector selector;
private boolean started;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
Boss() {
}
void register(NioClientSocketChannel channel) {
Runnable registerTask = new RegisterTask(this, channel);
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(bossExecutor, this);
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a selector.", t);
}
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
@Override
public void run() {
boolean shutdown = false;
Selector selector = this.selector;
long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
for (;;) {
wakenUp.set(false);
try {
int selectedKeyCount = selector.select(10);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
processRegisterTaskQueue();
if (selectedKeyCount > 0) {
processSelectedKeys(selector.selectedKeys());
}
// Handle connection timeout every 10 milliseconds approximately.
long currentTimeNanos = System.nanoTime();
if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) {
lastConnectTimeoutCheckTimeNanos = currentTimeNanos;
processConnectTimeout(selector.keys(), currentTimeNanos);
}
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connection attempts are made in a one-by-one manner
// instead of concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown ||
bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
started = false;
try {
selector.close();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a selector.", e);
}
} finally {
this.selector = null;
}
break;
} else {
shutdown = false;
}
}
} else {
// Give one more second.
shutdown = true;
}
} else {
shutdown = false;
}
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"Unexpected exception in the selector loop.", t);
}
// Prevent possible consecutive immediate failures.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
private void processRegisterTaskQueue() {
for (;;) {
final Runnable task = registerTaskQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
if (!k.isValid()) {
close(k);
continue;
}
if (k.isConnectable()) {
connect(k);
}
}
}
private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
ConnectException cause = null;
for (SelectionKey k: keys) {
if (!k.isValid()) {
// Comment the close call again as it gave us major problems with ClosedChannelExceptions.
//
// See:
// * https://github.com/netty/netty/issues/142
// * https://github.com/netty/netty/issues/138
//
//close(k);
continue;
}
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
if (ch.connectDeadlineNanos > 0 &&
currentTimeNanos >= ch.connectDeadlineNanos) {
if (cause == null) {
cause = new ConnectException("connection timed out");
}
ch.connectFuture.setFailure(cause);
fireExceptionCaught(ch, cause);
ch.worker.close(ch, succeededFuture(ch));
}
}
}
private void connect(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
if (ch.channel.finishConnect()) {
k.cancel();
ch.worker.register(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t);
k.cancel(); // Some JDK implementations run into an infinite loop without this.
ch.worker.close(ch, succeededFuture(ch));
}
}
private void close(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
ch.worker.close(ch, succeededFuture(ch));
}
}
private static final class RegisterTask implements Runnable {
private final Boss boss;
private final NioClientSocketChannel channel;
RegisterTask(Boss boss, NioClientSocketChannel channel) {
this.boss = boss;
this.channel = channel;
}
@Override
public void run() {
try {
channel.channel.register(
boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) {
channel.worker.close(channel, succeededFuture(channel));
}
int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
if (connectTimeout > 0) {
channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
}
}
}
} }

View File

@ -70,32 +70,32 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
switch (state) { switch (state) {
case OPEN: case OPEN:
if (Boolean.FALSE.equals(value)) { if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future); channel.getWorker().close(channel, future);
} }
break; break;
case BOUND: case BOUND:
if (value != null) { if (value != null) {
bind(channel, future, (InetSocketAddress) value); bind(channel, future, (InetSocketAddress) value);
} else { } else {
channel.worker.close(channel, future); channel.getWorker().close(channel, future);
} }
break; break;
case CONNECTED: case CONNECTED:
if (value != null) { if (value != null) {
connect(channel, future, (InetSocketAddress) value); connect(channel, future, (InetSocketAddress) value);
} else { } else {
NioDatagramWorker.disconnect(channel, future); channel.getWorker().disconnect(channel, future);
} }
break; break;
case INTEREST_OPS: case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue());
break; break;
} }
} else if (e instanceof MessageEvent) { } else if (e instanceof MessageEvent) {
final MessageEvent event = (MessageEvent) e; final MessageEvent event = (MessageEvent) e;
final boolean offered = channel.writeBufferQueue.offer(event); final boolean offered = channel.writeBufferQueue.offer(event);
assert offered; assert offered;
channel.worker.writeFromUserCode(channel); channel.getWorker().writeFromUserCode(channel);
} }
} }
@ -133,7 +133,7 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
future.setSuccess(); future.setSuccess();
fireChannelBound(channel, address); fireChannelBound(channel, address);
channel.worker.register(channel, null); channel.getWorker().registerWithWorker(channel, null);
started = true; started = true;
} catch (final Throwable t) { } catch (final Throwable t) {
future.setFailure(t); future.setFailure(t);
@ -171,16 +171,15 @@ class NioDatagramPipelineSink extends AbstractNioChannelSink {
fireChannelConnected(channel, channel.getRemoteAddress()); fireChannelConnected(channel, channel.getRemoteAddress());
if (!bound) { if (!bound) {
channel.worker.register(channel, future); channel.getWorker().registerWithWorker(channel, future);
} }
workerStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} finally { } finally {
if (connected && !workerStarted) { if (connected && !workerStarted) {
channel.worker.close(channel, future); channel.getWorker().close(channel, future);
} }
} }
} }

View File

@ -109,8 +109,7 @@ public class NioDatagramWorker extends AbstractNioWorker {
@Override @Override
protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) { protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
final Thread workerThread = thread; if (!isIoThread()) {
if (workerThread == null || Thread.currentThread() != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
// "add" the channels writeTask to the writeTaskQueue. // "add" the channels writeTask to the writeTaskQueue.
boolean offered = writeTaskQueue.offer(channel.writeTask); boolean offered = writeTaskQueue.offer(channel.writeTask);
@ -130,9 +129,9 @@ public class NioDatagramWorker extends AbstractNioWorker {
} }
static void disconnect(NioDatagramChannel channel, ChannelFuture future) { void disconnect(NioDatagramChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected(); boolean connected = channel.isConnected();
boolean iothread = isIoThread(channel); boolean iothread = isIoThread();
try { try {
channel.getDatagramChannel().disconnect(); channel.getDatagramChannel().disconnect();
future.setSuccess(); future.setSuccess();
@ -155,31 +154,7 @@ public class NioDatagramWorker extends AbstractNioWorker {
@Override @Override
protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) { protected void registerTask(AbstractNioChannel<?> channel, ChannelFuture future) {
return new ChannelRegistionTask((NioDatagramChannel) channel, future);
}
/**
* RegisterTask is a task responsible for registering a channel with a
* selector.
*/
private final class ChannelRegistionTask implements Runnable {
private final NioDatagramChannel channel;
private final ChannelFuture future;
ChannelRegistionTask(final NioDatagramChannel channel,
final ChannelFuture future) {
this.channel = channel;
this.future = future;
}
/**
* This runnable's task. Does the actual registering by calling the
* underlying DatagramChannels peer DatagramSocket register method.
*/
@Override
public void run() {
final SocketAddress localAddress = channel.getLocalAddress(); final SocketAddress localAddress = channel.getLocalAddress();
if (localAddress == null) { if (localAddress == null) {
if (future != null) { if (future != null) {
@ -191,7 +166,7 @@ public class NioDatagramWorker extends AbstractNioWorker {
try { try {
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
channel.getDatagramChannel().register( ((NioDatagramChannel) channel).getDatagramChannel().register(
selector, channel.getRawInterestOps(), channel); selector, channel.getRawInterestOps(), channel);
} }
if (future != null) { if (future != null) {
@ -206,6 +181,6 @@ public class NioDatagramWorker extends AbstractNioWorker {
"Failed to register a socket to the selector.", e); "Failed to register a socket to the selector.", e);
} }
} }
}
} }

View File

@ -35,7 +35,7 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
final class NioServerSocketChannel extends AbstractServerChannel final class NioServerSocketChannel extends AbstractServerChannel
implements io.netty.channel.socket.ServerSocketChannel { implements io.netty.channel.socket.ServerSocketChannel, NioChannel {
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketChannel.class); InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
@ -43,12 +43,15 @@ final class NioServerSocketChannel extends AbstractServerChannel
final ServerSocketChannel socket; final ServerSocketChannel socket;
final Lock shutdownLock = new ReentrantLock(); final Lock shutdownLock = new ReentrantLock();
volatile Selector selector; volatile Selector selector;
final NioWorker worker;
private final ServerSocketChannelConfig config; private final ServerSocketChannelConfig config;
static NioServerSocketChannel create(ChannelFactory factory, static NioServerSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) { ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) {
NioServerSocketChannel instance = NioServerSocketChannel instance =
new NioServerSocketChannel(factory, pipeline, sink); new NioServerSocketChannel(factory, pipeline, sink, worker);
fireChannelOpen(instance); fireChannelOpen(instance);
return instance; return instance;
} }
@ -56,10 +59,10 @@ final class NioServerSocketChannel extends AbstractServerChannel
private NioServerSocketChannel( private NioServerSocketChannel(
ChannelFactory factory, ChannelFactory factory,
ChannelPipeline pipeline, ChannelPipeline pipeline,
ChannelSink sink) { ChannelSink sink, NioWorker worker) {
super(factory, pipeline, sink); super(factory, pipeline, sink);
this.worker = worker;
try { try {
socket = ServerSocketChannel.open(); socket = ServerSocketChannel.open();
} catch (IOException e) { } catch (IOException e) {
@ -110,4 +113,9 @@ final class NioServerSocketChannel extends AbstractServerChannel
protected boolean setClosed() { protected boolean setClosed() {
return super.setClosed(); return super.setClosed();
} }
@Override
public NioWorker getWorker() {
return worker;
}
} }

View File

@ -22,13 +22,11 @@ import java.util.concurrent.RejectedExecutionException;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelFactory; import io.netty.channel.socket.ServerSocketChannelFactory;
import io.netty.channel.socket.Worker; import io.netty.channel.socket.Worker;
import io.netty.util.ExternalResourceReleasable; import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
/** /**
* A {@link ServerSocketChannelFactory} which creates a server-side NIO-based * A {@link ServerSocketChannelFactory} which creates a server-side NIO-based
@ -85,18 +83,17 @@ import io.netty.util.internal.ExecutorUtil;
*/ */
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
final Executor bossExecutor;
private final WorkerPool<NioWorker> workerPool; private final WorkerPool<NioWorker> workerPool;
private final ChannelSink sink; private final NioServerSocketPipelineSink sink;
/** /**
* Create a new {@link NioServerSocketChannelFactory} using * Create a new {@link NioServerSocketChannelFactory} using
* {@link Executors#newCachedThreadPool()} for the boss and worker. * {@link Executors#newCachedThreadPool()} for the worker.
* *
* See {@link #NioServerSocketChannelFactory(Executor, Executor)} * See {@link #NioServerSocketChannelFactory(Executor, Executor)}
*/ */
public NioServerSocketChannelFactory() { public NioServerSocketChannelFactory() {
this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); this(Executors.newCachedThreadPool());
} }
@ -106,50 +103,38 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
* the number of available processors in the machine. The number of * the number of available processors in the machine. The number of
* available processors is obtained by {@link Runtime#availableProcessors()}. * available processors is obtained by {@link Runtime#availableProcessors()}.
* *
* @param bossExecutor
* the {@link Executor} which will execute the boss threads
* @param workerExecutor * @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads * the {@link Executor} which will execute the I/O worker threads
*/ */
public NioServerSocketChannelFactory( public NioServerSocketChannelFactory(Executor workerExecutor) {
Executor bossExecutor, Executor workerExecutor) { this(workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
} }
/** /**
* Creates a new instance. * Creates a new instance.
* *
* @param bossExecutor
* the {@link Executor} which will execute the boss threads
* @param workerExecutor * @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads * the {@link Executor} which will execute the I/O worker threads
* @param workerCount * @param workerCount
* the maximum number of I/O worker threads * the maximum number of I/O worker threads
*/ */
public NioServerSocketChannelFactory( public NioServerSocketChannelFactory(Executor workerExecutor,
Executor bossExecutor, Executor workerExecutor,
int workerCount) { int workerCount) {
this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount, true)); this(new NioWorkerPool(workerExecutor, workerCount, true));
} }
/** /**
* Creates a new instance. * Creates a new instance.
* *
* @param bossExecutor
* the {@link Executor} which will execute the boss threads
* @param workerPool * @param workerPool
* the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads * the {@link WorkerPool} which will be used to obtain the {@link Worker} that execute the I/O worker threads
*/ */
public NioServerSocketChannelFactory( public NioServerSocketChannelFactory(WorkerPool<NioWorker> workerPool) {
Executor bossExecutor, WorkerPool<NioWorker> workerPool) {
if (bossExecutor == null) {
throw new NullPointerException("bossExecutor");
}
if (workerPool == null) { if (workerPool == null) {
throw new NullPointerException("workerPool"); throw new NullPointerException("workerPool");
} }
this.bossExecutor = bossExecutor;
this.workerPool = workerPool; this.workerPool = workerPool;
sink = new NioServerSocketPipelineSink(workerPool); sink = new NioServerSocketPipelineSink(workerPool);
} }
@ -157,12 +142,11 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
@Override @Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) { public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return NioServerSocketChannel.create(this, pipeline, sink); return NioServerSocketChannel.create(this, pipeline, sink, sink.nextWorker());
} }
@Override @Override
public void releaseExternalResources() { public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor);
if (workerPool instanceof ExternalResourceReleasable) { if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources(); ((ExternalResourceReleasable) workerPool).releaseExternalResources();
} }

View File

@ -17,16 +17,8 @@ package io.netty.channel.socket.nio;
import static io.netty.channel.Channels.*; import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
@ -37,7 +29,6 @@ import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent; import io.netty.channel.MessageEvent;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker;
class NioServerSocketPipelineSink extends AbstractNioChannelSink { class NioServerSocketPipelineSink extends AbstractNioChannelSink {
@ -76,14 +67,14 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
switch (state) { switch (state) {
case OPEN: case OPEN:
if (Boolean.FALSE.equals(value)) { if (Boolean.FALSE.equals(value)) {
close(channel, future); channel.worker.close(channel, future);
} }
break; break;
case BOUND: case BOUND:
if (value != null) { if (value != null) {
bind(channel, future, (SocketAddress) value); bind(channel, future, (SocketAddress) value);
} else { } else {
close(channel, future); channel.worker.close(channel, future);
} }
break; break;
} }
@ -100,17 +91,17 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
switch (state) { switch (state) {
case OPEN: case OPEN:
if (Boolean.FALSE.equals(value)) { if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future); channel.getWorker().close(channel, future);
} }
break; break;
case BOUND: case BOUND:
case CONNECTED: case CONNECTED:
if (value == null) { if (value == null) {
channel.worker.close(channel, future); channel.getWorker().close(channel, future);
} }
break; break;
case INTEREST_OPS: case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); channel.getWorker().setInterestOps(channel, future, ((Integer) value).intValue());
break; break;
} }
} else if (e instanceof MessageEvent) { } else if (e instanceof MessageEvent) {
@ -118,7 +109,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
NioSocketChannel channel = (NioSocketChannel) event.getChannel(); NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBufferQueue.offer(event); boolean offered = channel.writeBufferQueue.offer(event);
assert offered; assert offered;
channel.worker.writeFromUserCode(channel); channel.getWorker().writeFromUserCode(channel);
} }
} }
@ -127,7 +118,6 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
SocketAddress localAddress) { SocketAddress localAddress) {
boolean bound = false; boolean bound = false;
boolean bossStarted = false;
try { try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true; bound = true;
@ -135,157 +125,21 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
future.setSuccess(); future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress()); fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor = nextWorker().registerWithWorker(channel, future);
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(bossExecutor, new Boss(channel));
bossStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} finally { } finally {
if (!bossStarted && bound) { if (!bound) {
close(channel, future); channel.worker.close(channel, future);
} }
} }
} }
private void close(NioServerSocketChannel channel, ChannelFuture future) {
boolean bound = channel.isBound();
try {
if (channel.socket.isOpen()) {
channel.socket.close();
Selector selector = channel.selector;
if (selector != null) {
selector.wakeup();
}
}
// Make sure the boss thread is not running so that that the future
// is notified after a new connection cannot be accepted anymore.
// See NETTY-256 for more information.
channel.shutdownLock.lock();
try {
if (channel.setClosed()) {
future.setSuccess();
if (bound) {
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
} else {
future.setSuccess();
}
} finally {
channel.shutdownLock.unlock();
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
NioWorker nextWorker() { NioWorker nextWorker() {
return workerPool.nextWorker(); return workerPool.nextWorker();
} }
private final class Boss implements Runnable {
private final Selector selector;
private final NioServerSocketChannel channel;
Boss(NioServerSocketChannel channel) throws IOException {
this.channel = channel;
selector = Selector.open();
boolean registered = false;
try {
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered = true;
} finally {
if (!registered) {
closeSelector();
}
}
channel.selector = selector;
}
@Override
public void run() {
final Thread currentThread = Thread.currentThread();
channel.shutdownLock.lock();
try {
for (;;) {
try {
if (selector.select(1000) > 0) {
selector.selectedKeys().clear();
}
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket, currentThread);
}
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
break;
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
} finally {
channel.shutdownLock.unlock();
closeSelector();
}
}
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
try {
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker();
worker.register(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket, worker, currentThread), null);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to initialize an accepted socket.", e);
}
try {
acceptedSocket.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
}
private void closeSelector() {
channel.selector = null;
try {
selector.close();
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a selector.", e);
}
}
}
}
} }

View File

@ -23,7 +23,7 @@ import io.netty.channel.ChannelSink;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
public class NioSocketChannel extends AbstractNioChannel<SocketChannel> public abstract class NioSocketChannel extends AbstractNioChannel<SocketChannel>
implements io.netty.channel.socket.SocketChannel { implements io.netty.channel.socket.SocketChannel {
private static final int ST_OPEN = 0; private static final int ST_OPEN = 0;

View File

@ -108,16 +108,13 @@ public class NioWorker extends AbstractNioWorker {
@Override @Override
protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) { protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
final Thread currentThread = Thread.currentThread(); if (!isIoThread()) {
final Thread workerThread = thread;
if (currentThread != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
boolean offered = writeTaskQueue.offer(channel.writeTask); boolean offered = writeTaskQueue.offer(channel.writeTask);
assert offered; assert offered;
} }
if (!(channel instanceof NioAcceptedSocketChannel) || if (!(channel instanceof NioAcceptedSocketChannel)) {
((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
final Selector workerSelector = selector; final Selector workerSelector = selector;
if (workerSelector != null) { if (workerSelector != null) {
if (wakenUp.compareAndSet(false, true)) { if (wakenUp.compareAndSet(false, true)) {
@ -143,26 +140,8 @@ public class NioWorker extends AbstractNioWorker {
} }
@Override @Override
protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) { protected void registerTask(AbstractNioChannel<?> channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel); boolean server = !(channel instanceof NioClientSocketChannel);
return new RegisterTask((NioSocketChannel) channel, future, server);
}
private final class RegisterTask implements Runnable {
private final NioSocketChannel channel;
private final ChannelFuture future;
private final boolean server;
RegisterTask(
NioSocketChannel channel, ChannelFuture future, boolean server) {
this.channel = channel;
this.future = future;
this.server = server;
}
@Override
public void run() {
SocketAddress localAddress = channel.getLocalAddress(); SocketAddress localAddress = channel.getLocalAddress();
SocketAddress remoteAddress = channel.getRemoteAddress(); SocketAddress remoteAddress = channel.getRemoteAddress();
@ -179,14 +158,19 @@ public class NioWorker extends AbstractNioWorker {
channel.channel.configureBlocking(false); channel.channel.configureBlocking(false);
} }
boolean registered = channel.channel.isRegistered();
if (!registered) {
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
channel.channel.register( channel.channel.register(
selector, channel.getRawInterestOps(), channel); selector, channel.getRawInterestOps(), channel);
} }
}
if (future != null) { if (future != null) {
channel.setConnected(); ((NioSocketChannel) channel).setConnected();
future.setSuccess(); future.setSuccess();
} }
} catch (IOException e) { } catch (IOException e) {
if (future != null) { if (future != null) {
future.setFailure(e); future.setFailure(e);
@ -203,6 +187,5 @@ public class NioWorker extends AbstractNioWorker {
} }
fireChannelConnected(channel, remoteAddress); fireChannelConnected(channel, remoteAddress);
} }
}
} }