From d2987071982a9f54c03a67cab780320cbf72763a Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 10 Aug 2012 20:17:18 +0900 Subject: [PATCH] [#502] Split EventLoop/EventExecutor into parent and children - Add EventExecutorGroup and EventLoopGroup - EventExecutor and EventLoop extends EventExecutorGroup and EventLoopGroup - They form their own group so that .next() returns itself. - Rename Bootstrap.eventLoop() to group() - Rename parameter names such as executor to group - Rename *EventLoop/Executor to *EventLoop/ExecutorGroup - Rename *ChildEventLoop/Executor to *EventLoop/Executor --- .../netty/example/discard/DiscardClient.java | 4 +- .../netty/example/discard/DiscardServer.java | 4 +- .../io/netty/example/echo/EchoClient.java | 4 +- .../io/netty/example/echo/EchoServer.java | 4 +- .../example/factorial/FactorialClient.java | 4 +- .../example/factorial/FactorialServer.java | 4 +- .../http/file/HttpStaticFileServer.java | 4 +- .../example/http/snoop/HttpSnoopClient.java | 4 +- .../example/http/snoop/HttpSnoopServer.java | 4 +- .../websocketx/autobahn/AutobahnServer.java | 4 +- .../websocketx/client/WebSocketClient.java | 4 +- .../websocketx/server/WebSocketServer.java | 4 +- .../sslserver/WebSocketSslServer.java | 4 +- .../io/netty/example/localecho/LocalEcho.java | 8 +- .../example/localtime/LocalTimeClient.java | 4 +- .../example/localtime/LocalTimeServer.java | 4 +- .../example/objectecho/ObjectEchoClient.java | 4 +- .../example/objectecho/ObjectEchoServer.java | 4 +- .../PortUnificationServer.java | 4 +- .../io/netty/example/proxy/HexDumpProxy.java | 4 +- .../proxy/HexDumpProxyFrontendHandler.java | 2 +- .../example/qotm/QuoteOfTheMomentClient.java | 4 +- .../example/qotm/QuoteOfTheMomentServer.java | 4 +- .../example/securechat/SecureChatClient.java | 4 +- .../example/securechat/SecureChatServer.java | 4 +- .../io/netty/example/telnet/TelnetClient.java | 4 +- .../io/netty/example/telnet/TelnetServer.java | 4 +- .../io/netty/example/uptime/UptimeClient.java | 10 +- .../socket/SocketTestPermutation.java | 26 +- .../java/io/netty/bootstrap/Bootstrap.java | 26 +- .../io/netty/bootstrap/ServerBootstrap.java | 42 +-- .../io/netty/channel/ChannelPipeline.java | 12 +- .../channel/DefaultChannelHandlerContext.java | 19 +- .../netty/channel/DefaultChannelPipeline.java | 24 +- .../netty/channel/DefaultEventExecutor.java | 33 +- ...or.java => DefaultEventExecutorGroup.java} | 33 +- .../java/io/netty/channel/EventExecutor.java | 8 +- .../io/netty/channel/EventExecutorGroup.java | 57 +++ .../main/java/io/netty/channel/EventLoop.java | 6 +- .../java/io/netty/channel/EventLoopGroup.java | 24 ++ ...ava => MultithreadEventExecutorGroup.java} | 121 +------ ...op.java => MultithreadEventLoopGroup.java} | 13 +- .../channel/SingleThreadEventExecutor.java | 25 +- .../netty/channel/SingleThreadEventLoop.java | 14 +- .../channel/embedded/EmbeddedEventLoop.java | 9 +- .../channel/local/LocalChildEventLoop.java | 55 --- .../netty/channel/local/LocalEventLoop.java | 44 ++- .../channel/local/LocalEventLoopGroup.java | 41 +++ .../socket/aio/AbstractAioChannel.java | 10 +- .../channel/socket/aio/AioChildEventLoop.java | 7 +- ...oEventLoop.java => AioEventLoopGroup.java} | 67 +++- .../socket/aio/AioServerSocketChannel.java | 4 +- .../channel/socket/aio/AioSocketChannel.java | 6 +- .../socket/nio/AbstractNioChannel.java | 6 +- .../channel/socket/nio/NioChildEventLoop.java | 235 ------------- .../channel/socket/nio/NioEventLoop.java | 227 ++++++++++-- .../channel/socket/nio/NioEventLoopGroup.java | 52 +++ .../socket/oio/AbstractOioChannel.java | 2 +- .../channel/socket/oio/OioChildEventLoop.java | 106 ------ .../channel/socket/oio/OioEventLoop.java | 327 ++++-------------- .../channel/socket/oio/OioEventLoopGroup.java | 176 ++++++++++ .../channel/SingleThreadEventLoopTest.java | 2 +- .../local/LocalChannelRegistryTest.java | 4 +- .../local/LocalTransportThreadModelTest.java | 26 +- 64 files changed, 958 insertions(+), 1051 deletions(-) rename transport/src/main/java/io/netty/channel/{DefaultChildEventExecutor.java => DefaultEventExecutorGroup.java} (52%) create mode 100644 transport/src/main/java/io/netty/channel/EventExecutorGroup.java create mode 100644 transport/src/main/java/io/netty/channel/EventLoopGroup.java rename transport/src/main/java/io/netty/channel/{MultithreadEventExecutor.java => MultithreadEventExecutorGroup.java} (52%) rename transport/src/main/java/io/netty/channel/{MultithreadEventLoop.java => MultithreadEventLoopGroup.java} (71%) delete mode 100644 transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java create mode 100644 transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java rename transport/src/main/java/io/netty/channel/socket/aio/{AioEventLoop.java => AioEventLoopGroup.java} (71%) delete mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java delete mode 100644 transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java create mode 100644 transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java diff --git a/example/src/main/java/io/netty/example/discard/DiscardClient.java b/example/src/main/java/io/netty/example/discard/DiscardClient.java index 195b68bea3..10742a5bd6 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardClient.java +++ b/example/src/main/java/io/netty/example/discard/DiscardClient.java @@ -17,7 +17,7 @@ package io.netty.example.discard; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; /** @@ -38,7 +38,7 @@ public class DiscardClient { public void run() throws Exception { Bootstrap b = new Bootstrap(); try { - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioSocketChannel()) .remoteAddress(host, port) .handler(new DiscardClientHandler(firstMessageSize)); diff --git a/example/src/main/java/io/netty/example/discard/DiscardServer.java b/example/src/main/java/io/netty/example/discard/DiscardServer.java index 0c7ac4334f..2b1678c520 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardServer.java +++ b/example/src/main/java/io/netty/example/discard/DiscardServer.java @@ -19,7 +19,7 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** @@ -36,7 +36,7 @@ public class DiscardServer { public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new ChannelInitializer() { diff --git a/example/src/main/java/io/netty/example/echo/EchoClient.java b/example/src/main/java/io/netty/example/echo/EchoClient.java index 39dcc7a5ec..c0853454ae 100644 --- a/example/src/main/java/io/netty/example/echo/EchoClient.java +++ b/example/src/main/java/io/netty/example/echo/EchoClient.java @@ -20,7 +20,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; @@ -49,7 +49,7 @@ public class EchoClient { // Configure the client. Bootstrap b = new Bootstrap(); try { - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioSocketChannel()) .option(ChannelOption.TCP_NODELAY, true) .remoteAddress(new InetSocketAddress(host, port)) diff --git a/example/src/main/java/io/netty/example/echo/EchoServer.java b/example/src/main/java/io/netty/example/echo/EchoServer.java index 5c8ecb4139..638ba8bb49 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServer.java +++ b/example/src/main/java/io/netty/example/echo/EchoServer.java @@ -20,7 +20,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; @@ -42,7 +42,7 @@ public class EchoServer { // Configure the server. ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .option(ChannelOption.SO_BACKLOG, 100) .localAddress(new InetSocketAddress(port)) diff --git a/example/src/main/java/io/netty/example/factorial/FactorialClient.java b/example/src/main/java/io/netty/example/factorial/FactorialClient.java index 42bb43da95..6c17c5ec3a 100644 --- a/example/src/main/java/io/netty/example/factorial/FactorialClient.java +++ b/example/src/main/java/io/netty/example/factorial/FactorialClient.java @@ -17,7 +17,7 @@ package io.netty.example.factorial; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; /** @@ -39,7 +39,7 @@ public class FactorialClient { public void run() throws Exception { Bootstrap b = new Bootstrap(); try { - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioSocketChannel()) .remoteAddress(host, port) .handler(new FactorialClientInitializer(count)); diff --git a/example/src/main/java/io/netty/example/factorial/FactorialServer.java b/example/src/main/java/io/netty/example/factorial/FactorialServer.java index 0e83c6da36..1c68264ef0 100644 --- a/example/src/main/java/io/netty/example/factorial/FactorialServer.java +++ b/example/src/main/java/io/netty/example/factorial/FactorialServer.java @@ -16,7 +16,7 @@ package io.netty.example.factorial; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** @@ -34,7 +34,7 @@ public class FactorialServer { public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new FactorialServerInitializer()); diff --git a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java index c5cbf7b66e..f228653d83 100644 --- a/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java +++ b/example/src/main/java/io/netty/example/http/file/HttpStaticFileServer.java @@ -16,7 +16,7 @@ package io.netty.example.http.file; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class HttpStaticFileServer { @@ -30,7 +30,7 @@ public class HttpStaticFileServer { public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new HttpStaticFileServerInitializer()); diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java index 89918eeb7a..bff8050742 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopClient.java @@ -17,7 +17,7 @@ package io.netty.example.http.snoop; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.ClientCookieEncoder; import io.netty.handler.codec.http.DefaultCookie; @@ -64,7 +64,7 @@ public class HttpSnoopClient { // Configure the client. Bootstrap b = new Bootstrap(); try { - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioSocketChannel()) .handler(new HttpSnoopClientInitializer(ssl)) .remoteAddress(new InetSocketAddress(host, port)); diff --git a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java index 7879e5930b..aed54afc01 100644 --- a/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java +++ b/example/src/main/java/io/netty/example/http/snoop/HttpSnoopServer.java @@ -17,7 +17,7 @@ package io.netty.example.http.snoop; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; @@ -39,7 +39,7 @@ public class HttpSnoopServer { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .childHandler(new HttpSnoopServerInitializer()) .localAddress(new InetSocketAddress(port)); diff --git a/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java b/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java index c4151e7f4b..95f42c9a19 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java +++ b/example/src/main/java/io/netty/example/http/websocketx/autobahn/AutobahnServer.java @@ -17,7 +17,7 @@ package io.netty.example.http.websocketx.autobahn; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** @@ -35,7 +35,7 @@ public class AutobahnServer { public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new AutobahnServerInitializer()); diff --git a/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java b/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java index a5f7be896c..b0d7fe5260 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java +++ b/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClient.java @@ -42,7 +42,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpRequestEncoder; import io.netty.handler.codec.http.HttpResponseDecoder; @@ -83,7 +83,7 @@ public class WebSocketClient { new WebSocketClientHandshakerFactory().newHandshaker( uri, WebSocketVersion.V13, null, false, customHeaders); - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioSocketChannel()) .remoteAddress(uri.getHost(), uri.getPort()) .handler(new ChannelInitializer() { diff --git a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java index b9fea68a1c..d57a97f29e 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java +++ b/example/src/main/java/io/netty/example/http/websocketx/server/WebSocketServer.java @@ -17,7 +17,7 @@ package io.netty.example.http.websocketx.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** @@ -50,7 +50,7 @@ public class WebSocketServer { public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new WebSocketServerInitializer()); diff --git a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java index 0f1031f438..b8a8bb4240 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java +++ b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServer.java @@ -17,7 +17,7 @@ package io.netty.example.http.websocketx.sslserver; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** @@ -49,7 +49,7 @@ public class WebSocketSslServer { public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new WebSocketSslServerInitializer()); diff --git a/example/src/main/java/io/netty/example/localecho/LocalEcho.java b/example/src/main/java/io/netty/example/localecho/LocalEcho.java index 40d8814993..db573f2779 100644 --- a/example/src/main/java/io/netty/example/localecho/LocalEcho.java +++ b/example/src/main/java/io/netty/example/localecho/LocalEcho.java @@ -22,9 +22,9 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoop; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; @@ -49,7 +49,7 @@ public class LocalEcho { // Note that we can use any event loop to ensure certain local channels // are handled by the same event loop thread which drives a certain socket channel // to reduce the communication latency between socket channels and local channels. - sb.eventLoop(new LocalEventLoop(), new LocalEventLoop()) + sb.group(new LocalEventLoopGroup(), new LocalEventLoopGroup()) .channel(new LocalServerChannel()) .localAddress(addr) .handler(new ChannelInitializer() { @@ -67,7 +67,7 @@ public class LocalEcho { } }); - cb.eventLoop(new NioEventLoop()) + cb.group(new NioEventLoopGroup()) .channel(new LocalChannel()) .remoteAddress(addr) .handler(new ChannelInitializer() { diff --git a/example/src/main/java/io/netty/example/localtime/LocalTimeClient.java b/example/src/main/java/io/netty/example/localtime/LocalTimeClient.java index ff826be499..4e2a4ad5a1 100644 --- a/example/src/main/java/io/netty/example/localtime/LocalTimeClient.java +++ b/example/src/main/java/io/netty/example/localtime/LocalTimeClient.java @@ -17,7 +17,7 @@ package io.netty.example.localtime; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.ArrayList; @@ -45,7 +45,7 @@ public class LocalTimeClient { public void run() throws Exception { Bootstrap b = new Bootstrap(); try { - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioSocketChannel()) .remoteAddress(host, port) .handler(new LocalTimeClientInitializer()); diff --git a/example/src/main/java/io/netty/example/localtime/LocalTimeServer.java b/example/src/main/java/io/netty/example/localtime/LocalTimeServer.java index 36bac2ed37..14411f43fb 100644 --- a/example/src/main/java/io/netty/example/localtime/LocalTimeServer.java +++ b/example/src/main/java/io/netty/example/localtime/LocalTimeServer.java @@ -16,7 +16,7 @@ package io.netty.example.localtime; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** @@ -34,7 +34,7 @@ public class LocalTimeServer { public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new LocalTimeServerInitializer()); diff --git a/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java b/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java index b52adc997a..8ac8a8e712 100644 --- a/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java +++ b/example/src/main/java/io/netty/example/objectecho/ObjectEchoClient.java @@ -18,7 +18,7 @@ package io.netty.example.objectecho; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.example.echo.EchoClient; import io.netty.handler.codec.serialization.ClassResolvers; @@ -43,7 +43,7 @@ public class ObjectEchoClient { public void run() throws Exception { Bootstrap b = new Bootstrap(); try { - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioSocketChannel()) .remoteAddress(host, port) .handler(new ChannelInitializer() { diff --git a/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java b/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java index 7b4f060aef..20adddb6d9 100644 --- a/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java +++ b/example/src/main/java/io/netty/example/objectecho/ObjectEchoServer.java @@ -18,7 +18,7 @@ package io.netty.example.objectecho; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.example.echo.EchoServer; import io.netty.handler.codec.serialization.ClassResolvers; @@ -39,7 +39,7 @@ public class ObjectEchoServer { public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new ChannelInitializer() { diff --git a/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java b/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java index 3bf769894a..6037e83a32 100644 --- a/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java +++ b/example/src/main/java/io/netty/example/portunification/PortUnificationServer.java @@ -18,7 +18,7 @@ package io.netty.example.portunification; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** @@ -39,7 +39,7 @@ public class PortUnificationServer { public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new ChannelInitializer() { diff --git a/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java b/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java index 46f7b5c663..f5f2e33512 100644 --- a/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java +++ b/example/src/main/java/io/netty/example/proxy/HexDumpProxy.java @@ -16,7 +16,7 @@ package io.netty.example.proxy; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class HexDumpProxy { @@ -39,7 +39,7 @@ public class HexDumpProxy { // Configure the bootstrap. ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(localPort) .childHandler(new HexDumpProxyInitializer(remoteHost, remotePort)); diff --git a/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java index 202df3d32f..49e7925392 100644 --- a/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java +++ b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java @@ -44,7 +44,7 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundByteHandlerAdapte // Start the connection attempt. Bootstrap b = new Bootstrap(); - b.eventLoop(inboundChannel.eventLoop()) + b.group(inboundChannel.eventLoop()) .channel(new NioSocketChannel()) .remoteAddress(remoteHost, remotePort) .handler(new HexDumpProxyBackendHandler(inboundChannel)); diff --git a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClient.java b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClient.java index 246882d6a0..aaeb49217b 100644 --- a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClient.java +++ b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentClient.java @@ -21,7 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; @@ -43,7 +43,7 @@ public class QuoteOfTheMomentClient { public void run() throws Exception { Bootstrap b = new Bootstrap(); try { - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioDatagramChannel()) .localAddress(new InetSocketAddress(0)) .option(ChannelOption.SO_BROADCAST, true) diff --git a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServer.java b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServer.java index 184e21e58f..cf9abc02b2 100644 --- a/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServer.java +++ b/example/src/main/java/io/netty/example/qotm/QuoteOfTheMomentServer.java @@ -18,7 +18,7 @@ package io.netty.example.qotm; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelOption; import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import java.net.InetSocketAddress; @@ -39,7 +39,7 @@ public class QuoteOfTheMomentServer { public void run() throws Exception { Bootstrap b = new Bootstrap(); try { - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioDatagramChannel()) .localAddress(new InetSocketAddress(port)) .option(ChannelOption.SO_BROADCAST, true) diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatClient.java b/example/src/main/java/io/netty/example/securechat/SecureChatClient.java index b913223c9c..76099b1fab 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatClient.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatClient.java @@ -18,7 +18,7 @@ package io.netty.example.securechat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.example.telnet.TelnetClient; @@ -41,7 +41,7 @@ public class SecureChatClient { public void run() throws Exception { Bootstrap b = new Bootstrap(); try { - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioSocketChannel()) .remoteAddress(host, port) .handler(new SecureChatClientInitializer()); diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatServer.java b/example/src/main/java/io/netty/example/securechat/SecureChatServer.java index 19a9dac27f..3731eda2df 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatServer.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatServer.java @@ -16,7 +16,7 @@ package io.netty.example.securechat; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.example.telnet.TelnetServer; @@ -34,7 +34,7 @@ public class SecureChatServer { public void run() throws InterruptedException { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new SecureChatServerInitializer()); diff --git a/example/src/main/java/io/netty/example/telnet/TelnetClient.java b/example/src/main/java/io/netty/example/telnet/TelnetClient.java index b0052d62a7..befac65b8a 100644 --- a/example/src/main/java/io/netty/example/telnet/TelnetClient.java +++ b/example/src/main/java/io/netty/example/telnet/TelnetClient.java @@ -18,7 +18,7 @@ package io.netty.example.telnet; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.io.BufferedReader; @@ -40,7 +40,7 @@ public class TelnetClient { public void run() throws Exception { Bootstrap b = new Bootstrap(); try { - b.eventLoop(new NioEventLoop()) + b.group(new NioEventLoopGroup()) .channel(new NioSocketChannel()) .remoteAddress(host, port) .handler(new TelnetClientInitializer()); diff --git a/example/src/main/java/io/netty/example/telnet/TelnetServer.java b/example/src/main/java/io/netty/example/telnet/TelnetServer.java index 5dbc290a2a..784260bf6a 100644 --- a/example/src/main/java/io/netty/example/telnet/TelnetServer.java +++ b/example/src/main/java/io/netty/example/telnet/TelnetServer.java @@ -16,7 +16,7 @@ package io.netty.example.telnet; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** @@ -33,7 +33,7 @@ public class TelnetServer { public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { - b.eventLoop(new NioEventLoop(), new NioEventLoop()) + b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(new NioServerSocketChannel()) .localAddress(port) .childHandler(new TelnetServerPipelineFactory()); diff --git a/example/src/main/java/io/netty/example/uptime/UptimeClient.java b/example/src/main/java/io/netty/example/uptime/UptimeClient.java index 477ea36e86..ea5c434442 100644 --- a/example/src/main/java/io/netty/example/uptime/UptimeClient.java +++ b/example/src/main/java/io/netty/example/uptime/UptimeClient.java @@ -17,9 +17,9 @@ package io.netty.example.uptime; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; @@ -54,11 +54,11 @@ public class UptimeClient { } private Bootstrap configureBootstrap(Bootstrap b) { - return configureBootstrap(b, new NioEventLoop()); + return configureBootstrap(b, new NioEventLoopGroup()); } - Bootstrap configureBootstrap(Bootstrap b, EventLoop l) { - b.eventLoop(l) + Bootstrap configureBootstrap(Bootstrap b, EventLoopGroup g) { + b.group(g) .channel(new NioSocketChannel()) .remoteAddress(host, port) .handler(new ChannelInitializer() { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java index dd8d5c989e..e93e997471 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java @@ -18,15 +18,15 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.socket.InternetProtocolFamily; -import io.netty.channel.socket.aio.AioEventLoop; +import io.netty.channel.socket.aio.AioEventLoopGroup; import io.netty.channel.socket.aio.AioServerSocketChannel; import io.netty.channel.socket.aio.AioSocketChannel; import io.netty.channel.socket.nio.NioDatagramChannel; -import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.oio.OioDatagramChannel; -import io.netty.channel.socket.oio.OioEventLoop; +import io.netty.channel.socket.oio.OioEventLoopGroup; import io.netty.channel.socket.oio.OioServerSocketChannel; import io.netty.channel.socket.oio.OioSocketChannel; @@ -47,16 +47,16 @@ final class SocketTestPermutation { @Override public ServerBootstrap newInstance() { return new ServerBootstrap(). - eventLoop(new NioEventLoop(), new NioEventLoop()). + group(new NioEventLoopGroup(), new NioEventLoopGroup()). channel(new NioServerSocketChannel()); } }); sbfs.add(new Factory() { @Override public ServerBootstrap newInstance() { - AioEventLoop loop = new AioEventLoop(); + AioEventLoopGroup loop = new AioEventLoopGroup(); return new ServerBootstrap(). - eventLoop(loop, loop). + group(loop, loop). channel(new AioServerSocketChannel(loop)); } }); @@ -64,7 +64,7 @@ final class SocketTestPermutation { @Override public ServerBootstrap newInstance() { return new ServerBootstrap(). - eventLoop(new OioEventLoop(), new OioEventLoop()). + group(new OioEventLoopGroup(), new OioEventLoopGroup()). channel(new OioServerSocketChannel()); } }); @@ -75,20 +75,20 @@ final class SocketTestPermutation { cbfs.add(new Factory() { @Override public Bootstrap newInstance() { - return new Bootstrap().eventLoop(new NioEventLoop()).channel(new NioSocketChannel()); + return new Bootstrap().group(new NioEventLoopGroup()).channel(new NioSocketChannel()); } }); cbfs.add(new Factory() { @Override public Bootstrap newInstance() { - AioEventLoop loop = new AioEventLoop(); - return new Bootstrap().eventLoop(loop).channel(new AioSocketChannel(loop)); + AioEventLoopGroup loop = new AioEventLoopGroup(); + return new Bootstrap().group(loop).channel(new AioSocketChannel(loop)); } }); cbfs.add(new Factory() { @Override public Bootstrap newInstance() { - return new Bootstrap().eventLoop(new OioEventLoop()).channel(new OioSocketChannel()); + return new Bootstrap().group(new OioEventLoopGroup()).channel(new OioSocketChannel()); } }); @@ -132,14 +132,14 @@ final class SocketTestPermutation { bfs.add(new Factory() { @Override public Bootstrap newInstance() { - return new Bootstrap().eventLoop(new NioEventLoop()).channel( + return new Bootstrap().group(new NioEventLoopGroup()).channel( new NioDatagramChannel(InternetProtocolFamily.IPv4)); } }); bfs.add(new Factory() { @Override public Bootstrap newInstance() { - return new Bootstrap().eventLoop(new OioEventLoop()).channel(new OioDatagramChannel()); + return new Bootstrap().group(new OioEventLoopGroup()).channel(new OioDatagramChannel()); } }); diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index 68bf3cf4e6..501ec522f2 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -22,7 +22,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -39,20 +39,20 @@ public class Bootstrap { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class); private final Map, Object> options = new LinkedHashMap, Object>(); - private EventLoop eventLoop; + private EventLoopGroup group; private Channel channel; private ChannelHandler handler; private SocketAddress localAddress; private SocketAddress remoteAddress; - public Bootstrap eventLoop(EventLoop eventLoop) { - if (eventLoop == null) { - throw new NullPointerException("eventLoop"); + public Bootstrap group(EventLoopGroup group) { + if (group == null) { + throw new NullPointerException("group"); } - if (this.eventLoop != null) { - throw new IllegalStateException("eventLoop set already"); + if (this.group != null) { + throw new IllegalStateException("group set already"); } - this.eventLoop = eventLoop; + this.group = group; return this; } @@ -201,7 +201,7 @@ public class Bootstrap { } } - eventLoop.register(channel).syncUninterruptibly(); + group.register(channel).syncUninterruptibly(); } private static boolean ensureOpen(ChannelFuture future) { @@ -215,14 +215,14 @@ public class Bootstrap { } public void shutdown() { - if (eventLoop != null) { - eventLoop.shutdown(); + if (group != null) { + group.shutdown(); } } private void validate() { - if (eventLoop == null) { - throw new IllegalStateException("eventLoop not set"); + if (group == null) { + throw new IllegalStateException("group not set"); } if (channel == null) { throw new IllegalStateException("channel not set"); diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index 0bb1d1d6bb..a66a6f6238 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -28,7 +28,7 @@ import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -56,22 +56,22 @@ public class ServerBootstrap { private final Map, Object> parentOptions = new LinkedHashMap, Object>(); private final Map, Object> childOptions = new LinkedHashMap, Object>(); - private EventLoop parentEventLoop; - private EventLoop childEventLoop; + private EventLoopGroup parentGroup; + private EventLoopGroup childGroup; private ServerChannel channel; private ChannelHandler handler; private ChannelHandler childHandler; private SocketAddress localAddress; - public ServerBootstrap eventLoop(EventLoop parentEventLoop, EventLoop childEventLoop) { - if (parentEventLoop == null) { - throw new NullPointerException("parentEventLoop"); + public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { + if (parentGroup == null) { + throw new NullPointerException("parentGroup"); } - if (this.parentEventLoop != null) { - throw new IllegalStateException("eventLoop set already"); + if (this.parentGroup != null) { + throw new IllegalStateException("parentGroup set already"); } - this.parentEventLoop = parentEventLoop; - this.childEventLoop = childEventLoop; + this.parentGroup = parentGroup; + this.childGroup = childGroup; return this; } @@ -179,7 +179,7 @@ public class ServerBootstrap { } p.addLast(acceptor); - ChannelFuture f = parentEventLoop.register(channel).awaitUninterruptibly(); + ChannelFuture f = parentGroup.register(channel).awaitUninterruptibly(); if (!f.isSuccess()) { future.setFailure(f.cause()); return future; @@ -198,17 +198,17 @@ public class ServerBootstrap { } public void shutdown() { - if (parentEventLoop != null) { - parentEventLoop.shutdown(); + if (parentGroup != null) { + parentGroup.shutdown(); } - if (childEventLoop != null) { - childEventLoop.shutdown(); + if (childGroup != null) { + childGroup.shutdown(); } } private void validate() { - if (parentEventLoop == null) { - throw new IllegalStateException("eventLoop not set"); + if (parentGroup == null) { + throw new IllegalStateException("parentGroup not set"); } if (channel == null) { throw new IllegalStateException("channel not set"); @@ -216,9 +216,9 @@ public class ServerBootstrap { if (childHandler == null) { throw new IllegalStateException("childHandler not set"); } - if (childEventLoop == null) { - logger.warn("childEventLoop is not set. Using eventLoop instead."); - childEventLoop = parentEventLoop; + if (childGroup == null) { + logger.warn("childGroup is not set. Using parentGroup instead."); + childGroup = parentGroup; } if (localAddress == null) { logger.warn("localAddress is not set. Using " + DEFAULT_LOCAL_ADDR + " instead."); @@ -267,7 +267,7 @@ public class ServerBootstrap { } try { - childEventLoop.register(child); + childGroup.register(child); } catch (Throwable t) { logger.warn("Failed to register an accepted channel: " + child, t); } diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index ed114bd670..f812d937ab 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -234,7 +234,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI * @throws NullPointerException * if the specified name or handler is {@code null} */ - ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler); + ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); /** * Appends a {@link ChannelHandler} at the last position of this pipeline. @@ -260,7 +260,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI * @throws NullPointerException * if the specified name or handler is {@code null} */ - ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler); + ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); /** * Inserts a {@link ChannelHandler} before an existing handler of this @@ -294,7 +294,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI * @throws NullPointerException * if the specified baseName, name, or handler is {@code null} */ - ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler); + ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); /** * Inserts a {@link ChannelHandler} after an existing handler of this @@ -328,15 +328,15 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI * @throws NullPointerException * if the specified baseName, name, or handler is {@code null} */ - ChannelPipeline addAfter(EventExecutor executor, String baseName, String name, ChannelHandler handler); + ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); ChannelPipeline addFirst(ChannelHandler... handlers); - ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers); + ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); ChannelPipeline addLast(ChannelHandler... handlers); - ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers); + ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers); /** * Removes the specified {@link ChannelHandler} from this pipeline. diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 333454a2af..30324f1df5 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.EnumSet; import java.util.Queue; import java.util.Set; - import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -149,7 +148,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @SuppressWarnings("unchecked") DefaultChannelHandlerContext( - DefaultChannelPipeline pipeline, EventExecutor executor, + DefaultChannelPipeline pipeline, EventExecutorGroup group, DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next, String name, ChannelHandler handler) { @@ -188,19 +187,19 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements this.name = name; this.handler = handler; - if (executor != null) { + if (group != null) { // Pin one of the child executors once and remember it so that the same child executor // is used to fire events for the same channel. - EventExecutor childExecutor = pipeline.childExecutors.get(executor); + EventExecutor childExecutor = pipeline.childExecutors.get(group); if (childExecutor == null) { - childExecutor = executor.unsafe().nextChild(); - pipeline.childExecutors.put(executor, childExecutor); + childExecutor = group.next(); + pipeline.childExecutors.put(group, childExecutor); } - this.executor = childExecutor; + executor = childExecutor; } else if (channel.isRegistered()) { - this.executor = channel.eventLoop(); + executor = channel.eventLoop(); } else { - this.executor = null; + executor = null; } if (type.contains(ChannelHandlerType.INBOUND)) { @@ -805,6 +804,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public void readable(boolean readable) { - this.pipeline.readable(this, readable); + pipeline.readable(this, readable); } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index bdcc992a7d..9228fc5d4f 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -55,8 +55,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { private boolean firedChannelActive; private boolean fireInboundBufferUpdatedOnActivation; - final Map childExecutors = - new IdentityHashMap(); + final Map childExecutors = + new IdentityHashMap(); private final AtomicInteger suspendRead = new AtomicInteger(); public DefaultChannelPipeline(Channel channel) { @@ -84,7 +84,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addFirst(EventExecutor executor, final String name, final ChannelHandler handler) { + public ChannelPipeline addFirst(EventExecutorGroup group, final String name, final ChannelHandler handler) { try { Future future; @@ -92,7 +92,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { checkDuplicateName(name); final DefaultChannelHandlerContext nextCtx = head.next; final DefaultChannelHandlerContext newCtx = - new DefaultChannelHandlerContext(this, executor, head, nextCtx, name, handler); + new DefaultChannelHandlerContext(this, group, head, nextCtx, name, handler); if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { addFirst0(name, nextCtx, newCtx); @@ -143,7 +143,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addLast(EventExecutor executor, final String name, final ChannelHandler handler) { + public ChannelPipeline addLast(EventExecutorGroup group, final String name, final ChannelHandler handler) { try { Future future; @@ -152,7 +152,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { final DefaultChannelHandlerContext oldTail = tail; final DefaultChannelHandlerContext newTail = - new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler); + new DefaultChannelHandlerContext(this, group, oldTail, null, name, handler); if (!newTail.channel().isRegistered() || newTail.executor().inEventLoop()) { addLast0(name, oldTail, newTail); @@ -203,7 +203,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addBefore( - EventExecutor executor, String baseName, final String name, final ChannelHandler handler) { + EventExecutorGroup group, String baseName, final String name, final ChannelHandler handler) { try { Future future; @@ -211,7 +211,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { final DefaultChannelHandlerContext ctx = getContextOrDie(baseName); checkDuplicateName(name); final DefaultChannelHandlerContext newCtx = - new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler); + new DefaultChannelHandlerContext(this, group, ctx.prev, ctx, name, handler); if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { addBefore0(name, ctx, newCtx); @@ -262,7 +262,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addAfter( - EventExecutor executor, String baseName, final String name, final ChannelHandler handler) { + EventExecutorGroup group, String baseName, final String name, final ChannelHandler handler) { try { Future future; @@ -274,7 +274,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } checkDuplicateName(name); final DefaultChannelHandlerContext newCtx = - new DefaultChannelHandlerContext(this, executor, ctx, ctx.next, name, handler); + new DefaultChannelHandlerContext(this, group, ctx, ctx.next, name, handler); if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { addAfter0(name, ctx, newCtx); @@ -325,7 +325,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers) { + public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } @@ -354,7 +354,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers) { + public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } diff --git a/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java b/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java index a346093e4e..33c6993e00 100644 --- a/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/DefaultEventExecutor.java @@ -17,18 +17,33 @@ package io.netty.channel; import java.util.concurrent.ThreadFactory; -public class DefaultEventExecutor extends MultithreadEventExecutor { +class DefaultEventExecutor extends SingleThreadEventExecutor { - public DefaultEventExecutor(int nThreads) { - this(nThreads, null); - } - - public DefaultEventExecutor(int nThreads, ThreadFactory threadFactory) { - super(nThreads, threadFactory); + DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) { + super(parent, threadFactory); } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new DefaultChildEventExecutor(threadFactory); + protected void run() { + for (;;) { + Runnable task; + try { + task = takeTask(); + task.run(); + } catch (InterruptedException e) { + // Waken up by interruptThread() + } + + if (isShutdown() && peekTask() == null) { + break; + } + } + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (!inEventLoop && isShutdown()) { + interruptThread(); + } } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChildEventExecutor.java b/transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java similarity index 52% rename from transport/src/main/java/io/netty/channel/DefaultChildEventExecutor.java rename to transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java index f28a7a3037..424c095a41 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChildEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/DefaultEventExecutorGroup.java @@ -17,33 +17,18 @@ package io.netty.channel; import java.util.concurrent.ThreadFactory; -class DefaultChildEventExecutor extends SingleThreadEventExecutor { +public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup { - DefaultChildEventExecutor(ThreadFactory threadFactory) { - super(threadFactory); + public DefaultEventExecutorGroup(int nThreads) { + this(nThreads, null); + } + + public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) { + super(nThreads, threadFactory); } @Override - protected void run() { - for (;;) { - Runnable task; - try { - task = takeTask(); - task.run(); - } catch (InterruptedException e) { - // Waken up by interruptThread() - } - - if (isShutdown() && peekTask() == null) { - break; - } - } - } - - @Override - protected void wakeup(boolean inEventLoop) { - if (!inEventLoop && isShutdown()) { - interruptThread(); - } + protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + return new DefaultEventExecutor(this, threadFactory); } } diff --git a/transport/src/main/java/io/netty/channel/EventExecutor.java b/transport/src/main/java/io/netty/channel/EventExecutor.java index eb96c88122..27d200737c 100644 --- a/transport/src/main/java/io/netty/channel/EventExecutor.java +++ b/transport/src/main/java/io/netty/channel/EventExecutor.java @@ -17,12 +17,8 @@ package io.netty.channel; import java.util.concurrent.ScheduledExecutorService; -public interface EventExecutor extends ScheduledExecutorService { +public interface EventExecutor extends EventExecutorGroup, ScheduledExecutorService { + EventExecutorGroup parent(); boolean inEventLoop(); boolean inEventLoop(Thread thread); - Unsafe unsafe(); - - interface Unsafe { - EventExecutor nextChild(); - } } diff --git a/transport/src/main/java/io/netty/channel/EventExecutorGroup.java b/transport/src/main/java/io/netty/channel/EventExecutorGroup.java new file mode 100644 index 0000000000..c7d5c72618 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/EventExecutorGroup.java @@ -0,0 +1,57 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public interface EventExecutorGroup { + + /** + * Returns one of the {@link EventExecutor}s that belong to this group. + */ + EventExecutor next(); + + /** + * Shuts down all {@link EventExecutor}s managed by this group. + * + * @see ExecutorService#shutdown() + */ + void shutdown(); + + /** + * Returns {@code true} if and only if {@link #shutdown()} has been called. + * + * @see ExecutorService#isShutdown() + */ + boolean isShutdown(); + + /** + * Returns {@code true} if and only if {@link #shutdown()} has been called and all + * {@link EventExecutor}s managed by this group has been terminated completely. + * + * @see ExecutorService#isTerminated() + */ + boolean isTerminated(); + + /** + * Waits until {@link #isTerminated()} returns {@code true} or the specified amount of time + * passes. + * + * @see ExecutorService#awaitTermination(long, TimeUnit) + */ + boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; +} diff --git a/transport/src/main/java/io/netty/channel/EventLoop.java b/transport/src/main/java/io/netty/channel/EventLoop.java index be4acab7f2..9aa6726b41 100644 --- a/transport/src/main/java/io/netty/channel/EventLoop.java +++ b/transport/src/main/java/io/netty/channel/EventLoop.java @@ -15,7 +15,7 @@ */ package io.netty.channel; -public interface EventLoop extends EventExecutor { - ChannelFuture register(Channel channel); - ChannelFuture register(Channel channel, ChannelFuture future); +public interface EventLoop extends EventExecutor, EventLoopGroup { + @Override + EventLoopGroup parent(); } diff --git a/transport/src/main/java/io/netty/channel/EventLoopGroup.java b/transport/src/main/java/io/netty/channel/EventLoopGroup.java new file mode 100644 index 0000000000..48fcd5cd3f --- /dev/null +++ b/transport/src/main/java/io/netty/channel/EventLoopGroup.java @@ -0,0 +1,24 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +public interface EventLoopGroup extends EventExecutorGroup { + @Override + EventLoop next(); + + ChannelFuture register(Channel channel); + ChannelFuture register(Channel channel, ChannelFuture future); +} diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java b/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java similarity index 52% rename from transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java rename to transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java index 1d5208eccf..8622a1e177 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventExecutorGroup.java @@ -15,33 +15,19 @@ */ package io.netty.channel; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -public abstract class MultithreadEventExecutor implements EventExecutor { +public abstract class MultithreadEventExecutorGroup implements EventExecutorGroup { private static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; private static final AtomicInteger poolId = new AtomicInteger(); private final EventExecutor[] children; private final AtomicInteger childIndex = new AtomicInteger(); - private final Unsafe unsafe = new Unsafe() { - @Override - public EventExecutor nextChild() { - return children[Math.abs(childIndex.getAndIncrement() % children.length)]; - } - }; - protected MultithreadEventExecutor(int nThreads, ThreadFactory threadFactory, Object... args) { + protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { if (nThreads < 0) { throw new IllegalArgumentException(String.format( "nThreads: %d (expected: >= 0)", nThreads)); @@ -72,13 +58,13 @@ public abstract class MultithreadEventExecutor implements EventExecutor { } } - protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception; - @Override - public Unsafe unsafe() { - return unsafe; + public EventExecutor next() { + return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } + protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception; + @Override public void shutdown() { for (EventExecutor l: children) { @@ -86,14 +72,6 @@ public abstract class MultithreadEventExecutor implements EventExecutor { } } - @Override - public List shutdownNow() { - for (EventExecutor l: children) { - l.shutdownNow(); - } - return Collections.emptyList(); - } - @Override public boolean isShutdown() { for (EventExecutor l: children) { @@ -132,97 +110,12 @@ public abstract class MultithreadEventExecutor implements EventExecutor { return isTerminated(); } - @Override - public Future submit(Callable task) { - return currentEventLoop().submit(task); - } - - @Override - public Future submit(Runnable task, T result) { - return currentEventLoop().submit(task, result); - } - - @Override - public Future submit(Runnable task) { - return currentEventLoop().submit(task); - } - - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException { - return currentEventLoop().invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - return currentEventLoop().invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException { - return currentEventLoop().invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, - long timeout, TimeUnit unit) throws InterruptedException, - ExecutionException, TimeoutException { - return currentEventLoop().invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - currentEventLoop().execute(command); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, - TimeUnit unit) { - return currentEventLoop().schedule(command, delay, unit); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return currentEventLoop().schedule(callable, delay, unit); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - @Override - public boolean inEventLoop() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean inEventLoop(Thread thread) { - throw new UnsupportedOperationException(); - } - - private static EventExecutor currentEventLoop() { - EventExecutor loop = SingleThreadEventExecutor.currentEventLoop(); - if (loop == null) { - throw new IllegalStateException("not called from an event loop thread"); - } - return loop; - } - private final class DefaultThreadFactory implements ThreadFactory { private final AtomicInteger nextId = new AtomicInteger(); private final String prefix; DefaultThreadFactory() { - String typeName = MultithreadEventExecutor.this.getClass().getSimpleName(); + String typeName = MultithreadEventExecutorGroup.this.getClass().getSimpleName(); typeName = "" + Character.toLowerCase(typeName.charAt(0)) + typeName.substring(1); prefix = typeName + '-' + poolId.incrementAndGet() + '-'; } diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java similarity index 71% rename from transport/src/main/java/io/netty/channel/MultithreadEventLoop.java rename to transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java index 3a0415f810..38d7b7e9f0 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java @@ -17,20 +17,25 @@ package io.netty.channel; import java.util.concurrent.ThreadFactory; -public abstract class MultithreadEventLoop extends MultithreadEventExecutor implements EventLoop { +public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { - protected MultithreadEventLoop(int nThreads, ThreadFactory threadFactory, + protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads, threadFactory, args); } + @Override + public EventLoop next() { + return (EventLoop) super.next(); + } + @Override public ChannelFuture register(Channel channel) { - return ((EventLoop) unsafe().nextChild()).register(channel); + return next().register(channel); } @Override public ChannelFuture register(Channel channel, ChannelFuture future) { - return ((EventLoop) unsafe().nextChild()).register(channel, future); + return next().register(channel, future); } } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java index ca066cb2a2..f1321362bb 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java @@ -64,13 +64,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService return nanoTime() + delay; } - private final Unsafe unsafe = new Unsafe() { - @Override - public EventExecutor nextChild() { - return SingleThreadEventExecutor.this; - } - }; - + private final EventExecutorGroup parent; private final BlockingQueue taskQueue = new LinkedBlockingQueue(); private final Thread thread; private final Object stateLock = new Object(); @@ -83,7 +77,13 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService private long lastCheckTimeNanos; private long lastPurgeTimeNanos; - protected SingleThreadEventExecutor(ThreadFactory threadFactory) { + protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) { + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); + } + + this.parent = parent; + thread = threadFactory.newThread(new Runnable() { @Override public void run() { @@ -127,8 +127,13 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService } @Override - public Unsafe unsafe() { - return unsafe; + public EventExecutorGroup parent() { + return parent; + } + + @Override + public EventExecutor next() { + return this; } protected void interruptThread() { diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index f02417a341..c4d4c6b9fd 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -19,8 +19,18 @@ import java.util.concurrent.ThreadFactory; public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { - protected SingleThreadEventLoop(ThreadFactory threadFactory) { - super(threadFactory); + protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) { + super(parent, threadFactory); + } + + @Override + public EventLoopGroup parent() { + return (EventLoopGroup) super.parent(); + } + + @Override + public EventLoop next() { + return (EventLoop) super.next(); } @Override diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 667a461752..a5dc0aa0b6 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -17,8 +17,8 @@ package io.netty.channel.embedded; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.EventExecutor; import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; import java.util.Collections; import java.util.List; @@ -27,8 +27,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -class EmbeddedEventLoop extends AbstractExecutorService implements - EventLoop, EventExecutor.Unsafe { +class EmbeddedEventLoop extends AbstractExecutorService implements EventLoop { @Override public ScheduledFuture schedule(Runnable command, long delay, @@ -108,12 +107,12 @@ class EmbeddedEventLoop extends AbstractExecutorService implements } @Override - public Unsafe unsafe() { + public EventLoop next() { return this; } @Override - public EventExecutor nextChild() { + public EventLoopGroup parent() { return this; } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java deleted file mode 100644 index 31d0145a94..0000000000 --- a/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import io.netty.channel.SingleThreadEventLoop; - -import java.util.concurrent.ThreadFactory; - -final class LocalChildEventLoop extends SingleThreadEventLoop { - - LocalChildEventLoop(ThreadFactory threadFactory) { - super(threadFactory); - } - - @Override - protected void run() { - for (;;) { - Runnable task; - try { - task = takeTask(); - task.run(); - } catch (InterruptedException e) { - // Waken up by interruptThread() - } - - if (isShutdown()) { - task = pollTask(); - if (task == null) { - break; - } - task.run(); - } - } - } - - @Override - protected void wakeup(boolean inEventLoop) { - if (!inEventLoop && isShutdown()) { - interruptThread(); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java index a2ea6eb4eb..10047a86d5 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java @@ -15,27 +15,41 @@ */ package io.netty.channel.local; -import io.netty.channel.EventExecutor; -import io.netty.channel.MultithreadEventLoop; +import io.netty.channel.SingleThreadEventLoop; import java.util.concurrent.ThreadFactory; -public class LocalEventLoop extends MultithreadEventLoop { +final class LocalEventLoop extends SingleThreadEventLoop { - public LocalEventLoop() { - this(0); - } - - public LocalEventLoop(int nThreads) { - this(nThreads, null); - } - - public LocalEventLoop(int nThreads, ThreadFactory threadFactory) { - super(nThreads, threadFactory); + LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) { + super(parent, threadFactory); } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new LocalChildEventLoop(threadFactory); + protected void run() { + for (;;) { + Runnable task; + try { + task = takeTask(); + task.run(); + } catch (InterruptedException e) { + // Waken up by interruptThread() + } + + if (isShutdown()) { + task = pollTask(); + if (task == null) { + break; + } + task.run(); + } + } + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (!inEventLoop && isShutdown()) { + interruptThread(); + } } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java new file mode 100644 index 0000000000..3d6dba41eb --- /dev/null +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java @@ -0,0 +1,41 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.local; + +import io.netty.channel.EventExecutor; +import io.netty.channel.MultithreadEventLoopGroup; + +import java.util.concurrent.ThreadFactory; + +public class LocalEventLoopGroup extends MultithreadEventLoopGroup { + + public LocalEventLoopGroup() { + this(0); + } + + public LocalEventLoopGroup(int nThreads) { + this(nThreads, null); + } + + public LocalEventLoopGroup(int nThreads, ThreadFactory threadFactory) { + super(nThreads, threadFactory); + } + + @Override + protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + return new LocalEventLoop(this, threadFactory); + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java index ea2d7a3504..bbb000b15f 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit; abstract class AbstractAioChannel extends AbstractChannel { - protected final AioEventLoop eventLoop; + protected final AioEventLoopGroup group; private final AsynchronousChannel ch; /** @@ -41,10 +41,10 @@ abstract class AbstractAioChannel extends AbstractChannel { protected ScheduledFuture connectTimeoutFuture; private ConnectException connectTimeoutException; - protected AbstractAioChannel(Channel parent, Integer id, AioEventLoop eventLoop, AsynchronousChannel ch) { + protected AbstractAioChannel(Channel parent, Integer id, AioEventLoopGroup group, AsynchronousChannel ch) { super(parent, id); this.ch = ch; - this.eventLoop = eventLoop; + this.group = group; } @Override @@ -68,10 +68,10 @@ abstract class AbstractAioChannel extends AbstractChannel { @Override protected Runnable doRegister() throws Exception { - if (((AioChildEventLoop) eventLoop()).parent != eventLoop) { + if (((AioChildEventLoop) eventLoop()).parent() != group) { throw new ChannelException( getClass().getSimpleName() + " must be registered to the " + - AioEventLoop.class.getSimpleName() + " which was specified in the constructor."); + AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor."); } return null; } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java index fcadb4306f..cae4ea1469 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java @@ -21,11 +21,8 @@ import java.util.concurrent.ThreadFactory; final class AioChildEventLoop extends SingleThreadEventLoop { - final AioEventLoop parent; - - AioChildEventLoop(AioEventLoop parent, ThreadFactory threadFactory) { - super(threadFactory); - this.parent = parent; + AioChildEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory) { + super(parent, threadFactory); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java similarity index 71% rename from transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java rename to transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java index 2a747f9b08..2abaa66a8b 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java @@ -17,49 +17,48 @@ package io.netty.channel.socket.aio; import io.netty.channel.EventExecutor; import io.netty.channel.EventLoopException; -import io.netty.channel.MultithreadEventLoop; +import io.netty.channel.MultithreadEventLoopGroup; import java.io.IOException; import java.lang.reflect.Field; import java.nio.channels.AsynchronousChannelGroup; import java.util.ArrayDeque; +import java.util.Collections; import java.util.Deque; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; -public class AioEventLoop extends MultithreadEventLoop { +public class AioEventLoopGroup extends MultithreadEventLoopGroup { private static final ConcurrentMap, Field[]> fieldCache = new ConcurrentHashMap, Field[]>(); private static final Field[] FAILURE = new Field[0]; final AsynchronousChannelGroup group; - public AioEventLoop() { + public AioEventLoopGroup() { this(0); } - public AioEventLoop(int nThreads) { + public AioEventLoopGroup(int nThreads) { this(nThreads, null); } - public AioEventLoop(int nThreads, ThreadFactory threadFactory) { + public AioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { super(nThreads, threadFactory); try { - group = AsynchronousChannelGroup.withThreadPool(this); + group = AsynchronousChannelGroup.withThreadPool(new AioExecutorService()); } catch (IOException e) { throw new EventLoopException("Failed to create an AsynchronousChannelGroup", e); } } @Override - public void execute(Runnable command) { - Class commandType = command.getClass(); - if (commandType.getName().startsWith("sun.nio.ch.")) { - executeAioTask(command); - } else { - super.execute(command); - } + protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + return new AioChildEventLoop(this, threadFactory); } private void executeAioTask(Runnable command) { @@ -74,7 +73,7 @@ public class AioEventLoop extends MultithreadEventLoop { if (ch != null) { l = ch.eventLoop(); } else { - l = unsafe().nextChild(); + l = next(); } if (l.isShutdown()) { @@ -146,8 +145,42 @@ public class AioEventLoop extends MultithreadEventLoop { return null; } - @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new AioChildEventLoop(this, threadFactory); + private final class AioExecutorService extends AbstractExecutorService { + + @Override + public void shutdown() { + AioEventLoopGroup.this.shutdown(); + } + + @Override + public List shutdownNow() { + AioEventLoopGroup.this.shutdown(); + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return AioEventLoopGroup.this.isShutdown(); + } + + @Override + public boolean isTerminated() { + return AioEventLoopGroup.this.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return AioEventLoopGroup.this.awaitTermination(timeout, unit); + } + + @Override + public void execute(Runnable command) { + Class commandType = command.getClass(); + if (commandType.getName().startsWith("sun.nio.ch.")) { + executeAioTask(command); + } else { + next().execute(command); + } + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index 7563057ca2..d7eca8dc7a 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -60,7 +60,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server } } - public AioServerSocketChannel(AioEventLoop eventLoop) { + public AioServerSocketChannel(AioEventLoopGroup eventLoop) { super(null, null, eventLoop, newSocket(eventLoop.group)); config = new AioServerSocketChannelConfig(javaChannel()); } @@ -147,7 +147,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server // create the socket add it to the buffer and fire the event channel.pipeline().inboundMessageBuffer().add( - new AioSocketChannel(channel, null, channel.eventLoop, ch)); + new AioSocketChannel(channel, null, channel.group, ch)); if (!channel.readSuspended.get()) { channel.pipeline().fireInboundBufferUpdated(); } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 16a44c6c77..883b020a26 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -63,13 +63,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } }; - public AioSocketChannel(AioEventLoop eventLoop) { + public AioSocketChannel(AioEventLoopGroup eventLoop) { this(null, null, eventLoop, newSocket(eventLoop.group)); } AioSocketChannel( AioServerSocketChannel parent, Integer id, - AioEventLoop eventLoop, AsynchronousSocketChannel ch) { + AioEventLoopGroup eventLoop, AsynchronousSocketChannel ch) { super(parent, id, eventLoop, ch); config = new AioSocketChannelConfig(ch); } @@ -375,7 +375,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne if (eventLoop().inEventLoop()) { beginRead(); } else { - eventLoop.execute(readTask); + eventLoop().execute(readTask); } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 5d2f62616d..0e82baf1be 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -191,7 +191,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected boolean isCompatible(EventLoop loop) { - return loop instanceof NioChildEventLoop; + return loop instanceof NioEventLoop; } @Override @@ -202,7 +202,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected Runnable doRegister() throws Exception { - NioChildEventLoop loop = (NioChildEventLoop) eventLoop(); + NioEventLoop loop = (NioEventLoop) eventLoop(); selectionKey = javaChannel().register( loop.selector, isActive()? defaultInterestOps : 0, this); return null; @@ -210,7 +210,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected void doDeregister() throws Exception { - ((NioChildEventLoop) eventLoop()).cancel(selectionKey()); + ((NioEventLoop) eventLoop()).cancel(selectionKey()); } protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java deleted file mode 100644 index f2c5d19b6b..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioChildEventLoop.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelException; -import io.netty.channel.SingleThreadEventLoop; -import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - -import java.io.IOException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.SelectorProvider; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicBoolean; - -final class NioChildEventLoop extends SingleThreadEventLoop { - - /** - * Internal Netty logger. - */ - protected static final InternalLogger logger = InternalLoggerFactory - .getInstance(NioChildEventLoop.class); - - static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. - - /** - * The NIO {@link Selector}. - */ - protected final Selector selector; - - /** - * Boolean that controls determines if a blocked Selector.select should - * break out of its selection process. In our case we use a timeone for - * the select method and the select method will block for that time unless - * waken up. - */ - protected final AtomicBoolean wakenUp = new AtomicBoolean(); - - private int cancelledKeys; - private boolean cleanedCancelledKeys; - - NioChildEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) { - super(threadFactory); - if (selectorProvider == null) { - throw new NullPointerException("selectorProvider"); - } - selector = openSelector(selectorProvider); - } - - private static Selector openSelector(SelectorProvider provider) { - try { - return provider.openSelector(); - } catch (IOException e) { - throw new ChannelException("failed to open a new selector", e); - } - } - - @Override - protected void run() { - Selector selector = this.selector; - for (;;) { - - wakenUp.set(false); - - try { - SelectorUtil.select(selector); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { - selector.wakeup(); - } - - cancelledKeys = 0; - runAllTasks(); - processSelectedKeys(); - - if (isShutdown()) { - closeAll(); - if (peekTask() == null) { - break; - } - } - } catch (Throwable t) { - logger.warn( - "Unexpected exception in the selector loop.", t); - - // Prevent possible consecutive immediate failures that lead to - // excessive CPU consumption. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - } - } - - @Override - protected void cleanup() { - try { - selector.close(); - } catch (IOException e) { - logger.warn( - "Failed to close a selector.", e); - } - } - - void cancel(SelectionKey key) { - key.cancel(); - cancelledKeys ++; - if (cancelledKeys >= CLEANUP_INTERVAL) { - cancelledKeys = 0; - cleanedCancelledKeys = true; - SelectorUtil.cleanupKeys(selector); - } - } - - private void processSelectedKeys() { - Set selectedKeys = selector.selectedKeys(); - if (selectedKeys.isEmpty()) { - return; - } - - Iterator i; - cleanedCancelledKeys = false; - boolean clearSelectedKeys = true; - try { - for (i = selectedKeys.iterator(); i.hasNext();) { - final SelectionKey k = i.next(); - final AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); - final NioUnsafe unsafe = ch.unsafe(); - try { - int readyOps = k.readyOps(); - if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { - unsafe.read(); - if (!ch.isOpen()) { - // Connection already closed - no need to handle write. - continue; - } - } - if ((readyOps & SelectionKey.OP_WRITE) != 0) { - unsafe.flushNow(); - } - if ((readyOps & SelectionKey.OP_CONNECT) != 0) { - unsafe.finishConnect(); - } - } catch (CancelledKeyException ignored) { - unsafe.close(unsafe.voidFuture()); - } - - if (cleanedCancelledKeys) { - // Create the iterator again to avoid ConcurrentModificationException - if (selectedKeys.isEmpty()) { - clearSelectedKeys = false; - break; - } else { - i = selectedKeys.iterator(); - } - } - } - } finally { - if (clearSelectedKeys) { - selectedKeys.clear(); - } - } - } - - private void closeAll() { - SelectorUtil.cleanupKeys(selector); - Set keys = selector.keys(); - Collection channels = new ArrayList(keys.size()); - for (SelectionKey k: keys) { - channels.add((Channel) k.attachment()); - } - - for (Channel ch: channels) { - ch.unsafe().close(ch.unsafe().voidFuture()); - } - } - - @Override - protected void wakeup(boolean inEventLoop) { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java index 1c0b21c3bd..4775a2d91b 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java @@ -15,38 +15,221 @@ */ package io.netty.channel.socket.nio; -import io.netty.channel.EventExecutor; -import io.netty.channel.MultithreadEventLoop; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; +import java.io.IOException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; -public class NioEventLoop extends MultithreadEventLoop { +final class NioEventLoop extends SingleThreadEventLoop { - public NioEventLoop() { - this(0); + /** + * Internal Netty logger. + */ + protected static final InternalLogger logger = InternalLoggerFactory + .getInstance(NioEventLoop.class); + + static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. + + /** + * The NIO {@link Selector}. + */ + protected final Selector selector; + + /** + * Boolean that controls determines if a blocked Selector.select should + * break out of its selection process. In our case we use a timeone for + * the select method and the select method will block for that time unless + * waken up. + */ + protected final AtomicBoolean wakenUp = new AtomicBoolean(); + + private int cancelledKeys; + private boolean cleanedCancelledKeys; + + NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { + super(parent, threadFactory); + if (selectorProvider == null) { + throw new NullPointerException("selectorProvider"); + } + selector = openSelector(selectorProvider); } - public NioEventLoop(int nThreads) { - this(nThreads, null); - } - - public NioEventLoop(int nThreads, ThreadFactory threadFactory) { - super(nThreads, threadFactory); - } - - public NioEventLoop(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { - super(nThreads, threadFactory, selectorProvider); + private static Selector openSelector(SelectorProvider provider) { + try { + return provider.openSelector(); + } catch (IOException e) { + throw new ChannelException("failed to open a new selector", e); + } } @Override - protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - SelectorProvider selectorProvider; - if (args == null || args.length == 0 || args[0] == null) { - selectorProvider = SelectorProvider.provider(); - } else { - selectorProvider = (SelectorProvider) args[0]; + protected void run() { + Selector selector = this.selector; + for (;;) { + + wakenUp.set(false); + + try { + SelectorUtil.select(selector); + + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). + + if (wakenUp.get()) { + selector.wakeup(); + } + + cancelledKeys = 0; + runAllTasks(); + processSelectedKeys(); + + if (isShutdown()) { + closeAll(); + if (peekTask() == null) { + break; + } + } + } catch (Throwable t) { + logger.warn( + "Unexpected exception in the selector loop.", t); + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } + } + } + } + + @Override + protected void cleanup() { + try { + selector.close(); + } catch (IOException e) { + logger.warn( + "Failed to close a selector.", e); + } + } + + void cancel(SelectionKey key) { + key.cancel(); + cancelledKeys ++; + if (cancelledKeys >= CLEANUP_INTERVAL) { + cancelledKeys = 0; + cleanedCancelledKeys = true; + SelectorUtil.cleanupKeys(selector); + } + } + + private void processSelectedKeys() { + Set selectedKeys = selector.selectedKeys(); + if (selectedKeys.isEmpty()) { + return; + } + + Iterator i; + cleanedCancelledKeys = false; + boolean clearSelectedKeys = true; + try { + for (i = selectedKeys.iterator(); i.hasNext();) { + final SelectionKey k = i.next(); + final AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); + final NioUnsafe unsafe = ch.unsafe(); + try { + int readyOps = k.readyOps(); + if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { + unsafe.read(); + if (!ch.isOpen()) { + // Connection already closed - no need to handle write. + continue; + } + } + if ((readyOps & SelectionKey.OP_WRITE) != 0) { + unsafe.flushNow(); + } + if ((readyOps & SelectionKey.OP_CONNECT) != 0) { + unsafe.finishConnect(); + } + } catch (CancelledKeyException ignored) { + unsafe.close(unsafe.voidFuture()); + } + + if (cleanedCancelledKeys) { + // Create the iterator again to avoid ConcurrentModificationException + if (selectedKeys.isEmpty()) { + clearSelectedKeys = false; + break; + } else { + i = selectedKeys.iterator(); + } + } + } + } finally { + if (clearSelectedKeys) { + selectedKeys.clear(); + } + } + } + + private void closeAll() { + SelectorUtil.cleanupKeys(selector); + Set keys = selector.keys(); + Collection channels = new ArrayList(keys.size()); + for (SelectionKey k: keys) { + channels.add((Channel) k.attachment()); + } + + for (Channel ch: channels) { + ch.unsafe().close(ch.unsafe().voidFuture()); + } + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); } - return new NioChildEventLoop(threadFactory, selectorProvider); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java new file mode 100644 index 0000000000..48f6c98fbc --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoopGroup.java @@ -0,0 +1,52 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import io.netty.channel.EventExecutor; +import io.netty.channel.MultithreadEventLoopGroup; + +import java.nio.channels.spi.SelectorProvider; +import java.util.concurrent.ThreadFactory; + +public class NioEventLoopGroup extends MultithreadEventLoopGroup { + + public NioEventLoopGroup() { + this(0); + } + + public NioEventLoopGroup(int nThreads) { + this(nThreads, null); + } + + public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { + super(nThreads, threadFactory); + } + + public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { + super(nThreads, threadFactory, selectorProvider); + } + + @Override + protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { + SelectorProvider selectorProvider; + if (args == null || args.length == 0 || args[0] == null) { + selectorProvider = SelectorProvider.provider(); + } else { + selectorProvider = (SelectorProvider) args[0]; + } + return new NioEventLoop(this, threadFactory, selectorProvider); + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java index 7985d72bf9..647919f234 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -86,7 +86,7 @@ abstract class AbstractOioChannel extends AbstractChannel { @Override protected boolean isCompatible(EventLoop loop) { - return loop instanceof OioChildEventLoop; + return loop instanceof OioEventLoop; } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java deleted file mode 100644 index 917885e78d..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.oio; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.SingleThreadEventLoop; - - -class OioChildEventLoop extends SingleThreadEventLoop { - - private final OioEventLoop parent; - private AbstractOioChannel ch; - - OioChildEventLoop(OioEventLoop parent) { - super(parent.threadFactory); - this.parent = parent; - } - - @Override - public ChannelFuture register(Channel channel, ChannelFuture future) { - return super.register(channel, future).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - ch = (AbstractOioChannel) future.channel(); - } else { - deregister(); - } - } - }); - } - - @Override - protected void run() { - for (;;) { - AbstractOioChannel ch = this.ch; - if (ch == null || !ch.isActive()) { - Runnable task; - try { - task = takeTask(); - task.run(); - } catch (InterruptedException e) { - // Waken up by interruptThread() - } - } else { - long startTime = System.nanoTime(); - for (;;) { - final Runnable task = pollTask(); - if (task == null) { - break; - } - - task.run(); - - // Ensure running tasks doesn't take too much time. - if (System.nanoTime() - startTime > AbstractOioChannel.SO_TIMEOUT * 1000000L) { - break; - } - } - - ch.unsafe().read(); - - // Handle deregistration - if (!ch.isRegistered()) { - runAllTasks(); - deregister(); - } - } - - if (isShutdown()) { - if (ch != null) { - ch.unsafe().close(ch.unsafe().voidFuture()); - } - if (peekTask() == null) { - break; - } - } - } - } - - @Override - protected void wakeup(boolean inEventLoop) { - interruptThread(); - } - - private void deregister() { - ch = null; - parent.activeChildren.remove(this); - parent.idleChildren.add(this); - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java index bca22c8119..7049f7080f 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java @@ -15,273 +15,92 @@ */ package io.netty.channel.socket.oio; - import io.netty.channel.Channel; -import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; -import io.netty.channel.EventExecutor; -import io.netty.channel.EventLoop; -import io.netty.channel.SingleThreadEventExecutor; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.SingleThreadEventLoop; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -public class OioEventLoop implements EventLoop { +class OioEventLoop extends SingleThreadEventLoop { - private final int maxChannels; - final ThreadFactory threadFactory; - final Set activeChildren = Collections.newSetFromMap( - new ConcurrentHashMap()); - final Queue idleChildren = new ConcurrentLinkedQueue(); - private final ChannelException tooManyChannels; - private final Unsafe unsafe = new Unsafe() { - @Override - public EventExecutor nextChild() { - throw new UnsupportedOperationException(); - } - }; + private final OioEventLoopGroup parent; + private AbstractOioChannel ch; - public OioEventLoop() { - this(0); - } - - public OioEventLoop(int maxChannels) { - this(maxChannels, Executors.defaultThreadFactory()); - } - - public OioEventLoop(int maxChannels, ThreadFactory threadFactory) { - if (maxChannels < 0) { - throw new IllegalArgumentException(String.format( - "maxChannels: %d (expected: >= 0)", maxChannels)); - } - if (threadFactory == null) { - throw new NullPointerException("threadFactory"); - } - - this.maxChannels = maxChannels; - this.threadFactory = threadFactory; - - tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')'); - tooManyChannels.setStackTrace(new StackTraceElement[0]); - } - - @Override - public Unsafe unsafe() { - return unsafe; - } - - @Override - public void shutdown() { - for (EventLoop l: activeChildren) { - l.shutdown(); - } - for (EventLoop l: idleChildren) { - l.shutdown(); - } - } - - @Override - public List shutdownNow() { - for (EventLoop l: activeChildren) { - l.shutdownNow(); - } - for (EventLoop l: idleChildren) { - l.shutdownNow(); - } - return Collections.emptyList(); - } - - @Override - public boolean isShutdown() { - for (EventLoop l: activeChildren) { - if (!l.isShutdown()) { - return false; - } - } - for (EventLoop l: idleChildren) { - if (!l.isShutdown()) { - return false; - } - } - return true; - } - - @Override - public boolean isTerminated() { - for (EventLoop l: activeChildren) { - if (!l.isTerminated()) { - return false; - } - } - for (EventLoop l: idleChildren) { - if (!l.isTerminated()) { - return false; - } - } - return true; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) - throws InterruptedException { - long deadline = System.nanoTime() + unit.toNanos(timeout); - for (EventLoop l: activeChildren) { - for (;;) { - long timeLeft = deadline - System.nanoTime(); - if (timeLeft <= 0) { - return isTerminated(); - } - if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { - break; - } - } - } - for (EventLoop l: idleChildren) { - for (;;) { - long timeLeft = deadline - System.nanoTime(); - if (timeLeft <= 0) { - return isTerminated(); - } - if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { - break; - } - } - } - return isTerminated(); - } - - @Override - public Future submit(Callable task) { - return currentEventLoop().submit(task); - } - - @Override - public Future submit(Runnable task, T result) { - return currentEventLoop().submit(task, result); - } - - @Override - public Future submit(Runnable task) { - return currentEventLoop().submit(task); - } - - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException { - return currentEventLoop().invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - return currentEventLoop().invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException { - return currentEventLoop().invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, - long timeout, TimeUnit unit) throws InterruptedException, - ExecutionException, TimeoutException { - return currentEventLoop().invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - currentEventLoop().execute(command); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, - TimeUnit unit) { - return currentEventLoop().schedule(command, delay, unit); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return currentEventLoop().schedule(callable, delay, unit); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - @Override - public ChannelFuture register(Channel channel) { - if (channel == null) { - throw new NullPointerException("channel"); - } - try { - return nextChild().register(channel); - } catch (Throwable t) { - return channel.newFailedFuture(t); - } + OioEventLoop(OioEventLoopGroup parent) { + super(parent, parent.threadFactory); + this.parent = parent; } @Override public ChannelFuture register(Channel channel, ChannelFuture future) { - if (channel == null) { - throw new NullPointerException("channel"); - } - try { - return nextChild().register(channel, future); - } catch (Throwable t) { - return channel.newFailedFuture(t); - } - } - - @Override - public boolean inEventLoop() { - return SingleThreadEventExecutor.currentEventLoop() != null; - } - - @Override - public boolean inEventLoop(Thread thread) { - throw new UnsupportedOperationException(); - } - - private EventLoop nextChild() { - OioChildEventLoop loop = idleChildren.poll(); - if (loop == null) { - if (maxChannels > 0 && activeChildren.size() >= maxChannels) { - throw tooManyChannels; + return super.register(channel, future).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + ch = (AbstractOioChannel) future.channel(); + } else { + deregister(); + } } - loop = new OioChildEventLoop(this); - } - activeChildren.add(loop); - return loop; + }); } - private static OioChildEventLoop currentEventLoop() { - OioChildEventLoop loop = - (OioChildEventLoop) SingleThreadEventExecutor.currentEventLoop(); - if (loop == null) { - throw new IllegalStateException("not called from an event loop thread"); + @Override + protected void run() { + for (;;) { + AbstractOioChannel ch = this.ch; + if (ch == null || !ch.isActive()) { + Runnable task; + try { + task = takeTask(); + task.run(); + } catch (InterruptedException e) { + // Waken up by interruptThread() + } + } else { + long startTime = System.nanoTime(); + for (;;) { + final Runnable task = pollTask(); + if (task == null) { + break; + } + + task.run(); + + // Ensure running tasks doesn't take too much time. + if (System.nanoTime() - startTime > AbstractOioChannel.SO_TIMEOUT * 1000000L) { + break; + } + } + + ch.unsafe().read(); + + // Handle deregistration + if (!ch.isRegistered()) { + runAllTasks(); + deregister(); + } + } + + if (isShutdown()) { + if (ch != null) { + ch.unsafe().close(ch.unsafe().voidFuture()); + } + if (peekTask() == null) { + break; + } + } } - return loop; + } + + @Override + protected void wakeup(boolean inEventLoop) { + interruptThread(); + } + + private void deregister() { + ch = null; + parent.activeChildren.remove(this); + parent.idleChildren.add(this); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java new file mode 100644 index 0000000000..76e473faf2 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoopGroup.java @@ -0,0 +1,176 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.oio; + + +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; + +import java.util.Collections; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +public class OioEventLoopGroup implements EventLoopGroup { + + private final int maxChannels; + final ThreadFactory threadFactory; + final Set activeChildren = Collections.newSetFromMap( + new ConcurrentHashMap()); + final Queue idleChildren = new ConcurrentLinkedQueue(); + private final ChannelException tooManyChannels; + + public OioEventLoopGroup() { + this(0); + } + + public OioEventLoopGroup(int maxChannels) { + this(maxChannels, Executors.defaultThreadFactory()); + } + + public OioEventLoopGroup(int maxChannels, ThreadFactory threadFactory) { + if (maxChannels < 0) { + throw new IllegalArgumentException(String.format( + "maxChannels: %d (expected: >= 0)", maxChannels)); + } + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); + } + + this.maxChannels = maxChannels; + this.threadFactory = threadFactory; + + tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')'); + tooManyChannels.setStackTrace(new StackTraceElement[0]); + } + + @Override + public EventLoop next() { + throw new UnsupportedOperationException(); + } + + @Override + public void shutdown() { + for (EventLoop l: activeChildren) { + l.shutdown(); + } + for (EventLoop l: idleChildren) { + l.shutdown(); + } + } + + @Override + public boolean isShutdown() { + for (EventLoop l: activeChildren) { + if (!l.isShutdown()) { + return false; + } + } + for (EventLoop l: idleChildren) { + if (!l.isShutdown()) { + return false; + } + } + return true; + } + + @Override + public boolean isTerminated() { + for (EventLoop l: activeChildren) { + if (!l.isTerminated()) { + return false; + } + } + for (EventLoop l: idleChildren) { + if (!l.isTerminated()) { + return false; + } + } + return true; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + long deadline = System.nanoTime() + unit.toNanos(timeout); + for (EventLoop l: activeChildren) { + for (;;) { + long timeLeft = deadline - System.nanoTime(); + if (timeLeft <= 0) { + return isTerminated(); + } + if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { + break; + } + } + } + for (EventLoop l: idleChildren) { + for (;;) { + long timeLeft = deadline - System.nanoTime(); + if (timeLeft <= 0) { + return isTerminated(); + } + if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { + break; + } + } + } + return isTerminated(); + } + + @Override + public ChannelFuture register(Channel channel) { + if (channel == null) { + throw new NullPointerException("channel"); + } + try { + return nextChild().register(channel); + } catch (Throwable t) { + return channel.newFailedFuture(t); + } + } + + @Override + public ChannelFuture register(Channel channel, ChannelFuture future) { + if (channel == null) { + throw new NullPointerException("channel"); + } + try { + return nextChild().register(channel, future); + } catch (Throwable t) { + return channel.newFailedFuture(t); + } + } + + private EventLoop nextChild() { + OioEventLoop loop = idleChildren.poll(); + if (loop == null) { + if (maxChannels > 0 && activeChildren.size() >= maxChannels) { + throw tooManyChannels; + } + loop = new OioEventLoop(this); + } + activeChildren.add(loop); + return loop; + } +} diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 9baf6d34d4..bb9eaa0481 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -256,7 +256,7 @@ public class SingleThreadEventLoopTest { final AtomicInteger cleanedUp = new AtomicInteger(); SingleThreadEventLoopImpl() { - super(Executors.defaultThreadFactory()); + super(null, Executors.defaultThreadFactory()); } @Override diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java index 55ad59a70f..c5e0c8ea93 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java @@ -42,12 +42,12 @@ public class LocalChannelRegistryTest { Bootstrap cb = new Bootstrap(); ServerBootstrap sb = new ServerBootstrap(); - cb.eventLoop(new LocalEventLoop()) + cb.group(new LocalEventLoopGroup()) .channel(new LocalChannel()) .remoteAddress(addr) .handler(new TestHandler()); - sb.eventLoop(new LocalEventLoop(), new LocalEventLoop()) + sb.group(new LocalEventLoopGroup(), new LocalEventLoopGroup()) .channel(new LocalServerChannel()) .localAddress(addr) .childHandler(new ChannelInitializer() { diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 4423bfd6c6..db0cff8153 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -29,9 +29,9 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundByteHandler; import io.netty.channel.ChannelOutboundMessageHandler; -import io.netty.channel.DefaultEventExecutor; -import io.netty.channel.EventExecutor; -import io.netty.channel.EventLoop; +import io.netty.channel.DefaultEventExecutorGroup; +import io.netty.channel.EventExecutorGroup; +import io.netty.channel.EventLoopGroup; import java.util.HashSet; import java.util.Queue; @@ -57,7 +57,7 @@ public class LocalTransportThreadModelTest { public static void init() { // Configure a test server sb = new ServerBootstrap(); - sb.eventLoop(new LocalEventLoop(), new LocalEventLoop()) + sb.group(new LocalEventLoopGroup(), new LocalEventLoopGroup()) .channel(new LocalServerChannel()) .localAddress(LocalAddress.ANY) .childHandler(new ChannelInitializer() { @@ -89,9 +89,9 @@ public class LocalTransportThreadModelTest { @Test(timeout = 5000) public void testStagedExecution() throws Throwable { - EventLoop l = new LocalEventLoop(4, new PrefixThreadFactory("l")); - EventExecutor e1 = new DefaultEventExecutor(4, new PrefixThreadFactory("e1")); - EventExecutor e2 = new DefaultEventExecutor(4, new PrefixThreadFactory("e2")); + EventLoopGroup l = new LocalEventLoopGroup(4, new PrefixThreadFactory("l")); + EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e1")); + EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e2")); ThreadNameAuditor h1 = new ThreadNameAuditor(); ThreadNameAuditor h2 = new ThreadNameAuditor(); ThreadNameAuditor h3 = new ThreadNameAuditor(); @@ -206,12 +206,12 @@ public class LocalTransportThreadModelTest { @Test(timeout = 60000) public void testConcurrentMessageBufferAccess() throws Throwable { - EventLoop l = new LocalEventLoop(4, new PrefixThreadFactory("l")); - EventExecutor e1 = new DefaultEventExecutor(4, new PrefixThreadFactory("e1")); - EventExecutor e2 = new DefaultEventExecutor(4, new PrefixThreadFactory("e2")); - EventExecutor e3 = new DefaultEventExecutor(4, new PrefixThreadFactory("e3")); - EventExecutor e4 = new DefaultEventExecutor(4, new PrefixThreadFactory("e4")); - EventExecutor e5 = new DefaultEventExecutor(4, new PrefixThreadFactory("e5")); + EventLoopGroup l = new LocalEventLoopGroup(4, new PrefixThreadFactory("l")); + EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e1")); + EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e2")); + EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e3")); + EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e4")); + EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e5")); try { final MessageForwarder1 h1 = new MessageForwarder1();