[#502] Split EventLoop/EventExecutor into parent and children

- Add EventExecutorGroup and EventLoopGroup
- EventExecutor and EventLoop extends EventExecutorGroup and
  EventLoopGroup
  - They form their own group so that .next() returns itself.
- Rename Bootstrap.eventLoop() to group()
- Rename parameter names such as executor to group
- Rename *EventLoop/Executor to *EventLoop/ExecutorGroup
- Rename *ChildEventLoop/Executor to *EventLoop/Executor
This commit is contained in:
Trustin Lee 2012-08-10 20:17:18 +09:00
parent f4fa5698c1
commit d298707198
64 changed files with 958 additions and 1051 deletions

View File

@ -17,7 +17,7 @@ package io.netty.example.discard;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
@ -38,7 +38,7 @@ public class DiscardClient {
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.handler(new DiscardClientHandler(firstMessageSize));

View File

@ -19,7 +19,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
@ -36,7 +36,7 @@ public class DiscardServer {
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {

View File

@ -20,7 +20,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
@ -49,7 +49,7 @@ public class EchoClient {
// Configure the client.
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioSocketChannel())
.option(ChannelOption.TCP_NODELAY, true)
.remoteAddress(new InetSocketAddress(host, port))

View File

@ -20,7 +20,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
@ -42,7 +42,7 @@ public class EchoServer {
// Configure the server.
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.option(ChannelOption.SO_BACKLOG, 100)
.localAddress(new InetSocketAddress(port))

View File

@ -17,7 +17,7 @@ package io.netty.example.factorial;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
@ -39,7 +39,7 @@ public class FactorialClient {
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.handler(new FactorialClientInitializer(count));

View File

@ -16,7 +16,7 @@
package io.netty.example.factorial;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
@ -34,7 +34,7 @@ public class FactorialServer {
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new FactorialServerInitializer());

View File

@ -16,7 +16,7 @@
package io.netty.example.http.file;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class HttpStaticFileServer {
@ -30,7 +30,7 @@ public class HttpStaticFileServer {
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new HttpStaticFileServerInitializer());

View File

@ -17,7 +17,7 @@ package io.netty.example.http.snoop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.ClientCookieEncoder;
import io.netty.handler.codec.http.DefaultCookie;
@ -64,7 +64,7 @@ public class HttpSnoopClient {
// Configure the client.
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioSocketChannel())
.handler(new HttpSnoopClientInitializer(ssl))
.remoteAddress(new InetSocketAddress(host, port));

View File

@ -17,7 +17,7 @@ package io.netty.example.http.snoop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
@ -39,7 +39,7 @@ public class HttpSnoopServer {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.childHandler(new HttpSnoopServerInitializer())
.localAddress(new InetSocketAddress(port));

View File

@ -17,7 +17,7 @@ package io.netty.example.http.websocketx.autobahn;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
@ -35,7 +35,7 @@ public class AutobahnServer {
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new AutobahnServerInitializer());

View File

@ -42,7 +42,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
@ -83,7 +83,7 @@ public class WebSocketClient {
new WebSocketClientHandshakerFactory().newHandshaker(
uri, WebSocketVersion.V13, null, false, customHeaders);
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioSocketChannel())
.remoteAddress(uri.getHost(), uri.getPort())
.handler(new ChannelInitializer<SocketChannel>() {

View File

@ -17,7 +17,7 @@ package io.netty.example.http.websocketx.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
@ -50,7 +50,7 @@ public class WebSocketServer {
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new WebSocketServerInitializer());

View File

@ -17,7 +17,7 @@ package io.netty.example.http.websocketx.sslserver;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
@ -49,7 +49,7 @@ public class WebSocketSslServer {
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new WebSocketSslServerInitializer());

View File

@ -22,9 +22,9 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoop;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
@ -49,7 +49,7 @@ public class LocalEcho {
// Note that we can use any event loop to ensure certain local channels
// are handled by the same event loop thread which drives a certain socket channel
// to reduce the communication latency between socket channels and local channels.
sb.eventLoop(new LocalEventLoop(), new LocalEventLoop())
sb.group(new LocalEventLoopGroup(), new LocalEventLoopGroup())
.channel(new LocalServerChannel())
.localAddress(addr)
.handler(new ChannelInitializer<LocalServerChannel>() {
@ -67,7 +67,7 @@ public class LocalEcho {
}
});
cb.eventLoop(new NioEventLoop())
cb.group(new NioEventLoopGroup())
.channel(new LocalChannel())
.remoteAddress(addr)
.handler(new ChannelInitializer<LocalChannel>() {

View File

@ -17,7 +17,7 @@ package io.netty.example.localtime;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.ArrayList;
@ -45,7 +45,7 @@ public class LocalTimeClient {
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.handler(new LocalTimeClientInitializer());

View File

@ -16,7 +16,7 @@
package io.netty.example.localtime;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
@ -34,7 +34,7 @@ public class LocalTimeServer {
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new LocalTimeServerInitializer());

View File

@ -18,7 +18,7 @@ package io.netty.example.objectecho;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.echo.EchoClient;
import io.netty.handler.codec.serialization.ClassResolvers;
@ -43,7 +43,7 @@ public class ObjectEchoClient {
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {

View File

@ -18,7 +18,7 @@ package io.netty.example.objectecho;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.example.echo.EchoServer;
import io.netty.handler.codec.serialization.ClassResolvers;
@ -39,7 +39,7 @@ public class ObjectEchoServer {
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {

View File

@ -18,7 +18,7 @@ package io.netty.example.portunification;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
@ -39,7 +39,7 @@ public class PortUnificationServer {
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {

View File

@ -16,7 +16,7 @@
package io.netty.example.proxy;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class HexDumpProxy {
@ -39,7 +39,7 @@ public class HexDumpProxy {
// Configure the bootstrap.
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(localPort)
.childHandler(new HexDumpProxyInitializer(remoteHost, remotePort));

View File

@ -44,7 +44,7 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundByteHandlerAdapte
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.eventLoop(inboundChannel.eventLoop())
b.group(inboundChannel.eventLoop())
.channel(new NioSocketChannel())
.remoteAddress(remoteHost, remotePort)
.handler(new HexDumpProxyBackendHandler(inboundChannel));

View File

@ -21,7 +21,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
@ -43,7 +43,7 @@ public class QuoteOfTheMomentClient {
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioDatagramChannel())
.localAddress(new InetSocketAddress(0))
.option(ChannelOption.SO_BROADCAST, true)

View File

@ -18,7 +18,7 @@ package io.netty.example.qotm;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
@ -39,7 +39,7 @@ public class QuoteOfTheMomentServer {
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioDatagramChannel())
.localAddress(new InetSocketAddress(port))
.option(ChannelOption.SO_BROADCAST, true)

View File

@ -18,7 +18,7 @@ package io.netty.example.securechat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.telnet.TelnetClient;
@ -41,7 +41,7 @@ public class SecureChatClient {
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.handler(new SecureChatClientInitializer());

View File

@ -16,7 +16,7 @@
package io.netty.example.securechat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.example.telnet.TelnetServer;
@ -34,7 +34,7 @@ public class SecureChatServer {
public void run() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new SecureChatServerInitializer());

View File

@ -18,7 +18,7 @@ package io.netty.example.telnet;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
@ -40,7 +40,7 @@ public class TelnetClient {
public void run() throws Exception {
Bootstrap b = new Bootstrap();
try {
b.eventLoop(new NioEventLoop())
b.group(new NioEventLoopGroup())
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.handler(new TelnetClientInitializer());

View File

@ -16,7 +16,7 @@
package io.netty.example.telnet;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
@ -33,7 +33,7 @@ public class TelnetServer {
public void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
try {
b.eventLoop(new NioEventLoop(), new NioEventLoop())
b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(new NioServerSocketChannel())
.localAddress(port)
.childHandler(new TelnetServerPipelineFactory());

View File

@ -17,9 +17,9 @@ package io.netty.example.uptime;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
@ -54,11 +54,11 @@ public class UptimeClient {
}
private Bootstrap configureBootstrap(Bootstrap b) {
return configureBootstrap(b, new NioEventLoop());
return configureBootstrap(b, new NioEventLoopGroup());
}
Bootstrap configureBootstrap(Bootstrap b, EventLoop l) {
b.eventLoop(l)
Bootstrap configureBootstrap(Bootstrap b, EventLoopGroup g) {
b.group(g)
.channel(new NioSocketChannel())
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {

View File

@ -18,15 +18,15 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.aio.AioEventLoop;
import io.netty.channel.socket.aio.AioEventLoopGroup;
import io.netty.channel.socket.aio.AioServerSocketChannel;
import io.netty.channel.socket.aio.AioSocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.oio.OioDatagramChannel;
import io.netty.channel.socket.oio.OioEventLoop;
import io.netty.channel.socket.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
@ -47,16 +47,16 @@ final class SocketTestPermutation {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
eventLoop(new NioEventLoop(), new NioEventLoop()).
group(new NioEventLoopGroup(), new NioEventLoopGroup()).
channel(new NioServerSocketChannel());
}
});
sbfs.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
AioEventLoop loop = new AioEventLoop();
AioEventLoopGroup loop = new AioEventLoopGroup();
return new ServerBootstrap().
eventLoop(loop, loop).
group(loop, loop).
channel(new AioServerSocketChannel(loop));
}
});
@ -64,7 +64,7 @@ final class SocketTestPermutation {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
eventLoop(new OioEventLoop(), new OioEventLoop()).
group(new OioEventLoopGroup(), new OioEventLoopGroup()).
channel(new OioServerSocketChannel());
}
});
@ -75,20 +75,20 @@ final class SocketTestPermutation {
cbfs.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().eventLoop(new NioEventLoop()).channel(new NioSocketChannel());
return new Bootstrap().group(new NioEventLoopGroup()).channel(new NioSocketChannel());
}
});
cbfs.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
AioEventLoop loop = new AioEventLoop();
return new Bootstrap().eventLoop(loop).channel(new AioSocketChannel(loop));
AioEventLoopGroup loop = new AioEventLoopGroup();
return new Bootstrap().group(loop).channel(new AioSocketChannel(loop));
}
});
cbfs.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().eventLoop(new OioEventLoop()).channel(new OioSocketChannel());
return new Bootstrap().group(new OioEventLoopGroup()).channel(new OioSocketChannel());
}
});
@ -132,14 +132,14 @@ final class SocketTestPermutation {
bfs.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().eventLoop(new NioEventLoop()).channel(
return new Bootstrap().group(new NioEventLoopGroup()).channel(
new NioDatagramChannel(InternetProtocolFamily.IPv4));
}
});
bfs.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().eventLoop(new OioEventLoop()).channel(new OioDatagramChannel());
return new Bootstrap().group(new OioEventLoopGroup()).channel(new OioDatagramChannel());
}
});

View File

@ -22,7 +22,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -39,20 +39,20 @@ public class Bootstrap {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class);
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private EventLoop eventLoop;
private EventLoopGroup group;
private Channel channel;
private ChannelHandler handler;
private SocketAddress localAddress;
private SocketAddress remoteAddress;
public Bootstrap eventLoop(EventLoop eventLoop) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
public Bootstrap group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.eventLoop != null) {
throw new IllegalStateException("eventLoop set already");
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.eventLoop = eventLoop;
this.group = group;
return this;
}
@ -201,7 +201,7 @@ public class Bootstrap {
}
}
eventLoop.register(channel).syncUninterruptibly();
group.register(channel).syncUninterruptibly();
}
private static boolean ensureOpen(ChannelFuture future) {
@ -215,14 +215,14 @@ public class Bootstrap {
}
public void shutdown() {
if (eventLoop != null) {
eventLoop.shutdown();
if (group != null) {
group.shutdown();
}
}
private void validate() {
if (eventLoop == null) {
throw new IllegalStateException("eventLoop not set");
if (group == null) {
throw new IllegalStateException("group not set");
}
if (channel == null) {
throw new IllegalStateException("channel not set");

View File

@ -28,7 +28,7 @@ import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -56,22 +56,22 @@ public class ServerBootstrap {
private final Map<ChannelOption<?>, Object> parentOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private EventLoop parentEventLoop;
private EventLoop childEventLoop;
private EventLoopGroup parentGroup;
private EventLoopGroup childGroup;
private ServerChannel channel;
private ChannelHandler handler;
private ChannelHandler childHandler;
private SocketAddress localAddress;
public ServerBootstrap eventLoop(EventLoop parentEventLoop, EventLoop childEventLoop) {
if (parentEventLoop == null) {
throw new NullPointerException("parentEventLoop");
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
if (parentGroup == null) {
throw new NullPointerException("parentGroup");
}
if (this.parentEventLoop != null) {
throw new IllegalStateException("eventLoop set already");
if (this.parentGroup != null) {
throw new IllegalStateException("parentGroup set already");
}
this.parentEventLoop = parentEventLoop;
this.childEventLoop = childEventLoop;
this.parentGroup = parentGroup;
this.childGroup = childGroup;
return this;
}
@ -179,7 +179,7 @@ public class ServerBootstrap {
}
p.addLast(acceptor);
ChannelFuture f = parentEventLoop.register(channel).awaitUninterruptibly();
ChannelFuture f = parentGroup.register(channel).awaitUninterruptibly();
if (!f.isSuccess()) {
future.setFailure(f.cause());
return future;
@ -198,17 +198,17 @@ public class ServerBootstrap {
}
public void shutdown() {
if (parentEventLoop != null) {
parentEventLoop.shutdown();
if (parentGroup != null) {
parentGroup.shutdown();
}
if (childEventLoop != null) {
childEventLoop.shutdown();
if (childGroup != null) {
childGroup.shutdown();
}
}
private void validate() {
if (parentEventLoop == null) {
throw new IllegalStateException("eventLoop not set");
if (parentGroup == null) {
throw new IllegalStateException("parentGroup not set");
}
if (channel == null) {
throw new IllegalStateException("channel not set");
@ -216,9 +216,9 @@ public class ServerBootstrap {
if (childHandler == null) {
throw new IllegalStateException("childHandler not set");
}
if (childEventLoop == null) {
logger.warn("childEventLoop is not set. Using eventLoop instead.");
childEventLoop = parentEventLoop;
if (childGroup == null) {
logger.warn("childGroup is not set. Using parentGroup instead.");
childGroup = parentGroup;
}
if (localAddress == null) {
logger.warn("localAddress is not set. Using " + DEFAULT_LOCAL_ADDR + " instead.");
@ -267,7 +267,7 @@ public class ServerBootstrap {
}
try {
childEventLoop.register(child);
childGroup.register(child);
} catch (Throwable t) {
logger.warn("Failed to register an accepted channel: " + child, t);
}

View File

@ -234,7 +234,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
* @throws NullPointerException
* if the specified name or handler is {@code null}
*/
ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
@ -260,7 +260,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
* @throws NullPointerException
* if the specified name or handler is {@code null}
*/
ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
@ -294,7 +294,7 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
* @throws NullPointerException
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
@ -328,15 +328,15 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
* @throws NullPointerException
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addAfter(EventExecutor executor, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
/**
* Removes the specified {@link ChannelHandler} from this pipeline.

View File

@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -149,7 +148,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@SuppressWarnings("unchecked")
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor,
DefaultChannelPipeline pipeline, EventExecutorGroup group,
DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
String name, ChannelHandler handler) {
@ -188,19 +187,19 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
this.name = name;
this.handler = handler;
if (executor != null) {
if (group != null) {
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = pipeline.childExecutors.get(executor);
EventExecutor childExecutor = pipeline.childExecutors.get(group);
if (childExecutor == null) {
childExecutor = executor.unsafe().nextChild();
pipeline.childExecutors.put(executor, childExecutor);
childExecutor = group.next();
pipeline.childExecutors.put(group, childExecutor);
}
this.executor = childExecutor;
executor = childExecutor;
} else if (channel.isRegistered()) {
this.executor = channel.eventLoop();
executor = channel.eventLoop();
} else {
this.executor = null;
executor = null;
}
if (type.contains(ChannelHandlerType.INBOUND)) {
@ -805,6 +804,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public void readable(boolean readable) {
this.pipeline.readable(this, readable);
pipeline.readable(this, readable);
}
}

View File

@ -55,8 +55,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private boolean firedChannelActive;
private boolean fireInboundBufferUpdatedOnActivation;
final Map<EventExecutor, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutor, EventExecutor>();
final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>();
private final AtomicInteger suspendRead = new AtomicInteger();
public DefaultChannelPipeline(Channel channel) {
@ -84,7 +84,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public ChannelPipeline addFirst(EventExecutor executor, final String name, final ChannelHandler handler) {
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, final ChannelHandler handler) {
try {
Future<Throwable> future;
@ -92,7 +92,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
checkDuplicateName(name);
final DefaultChannelHandlerContext nextCtx = head.next;
final DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, executor, head, nextCtx, name, handler);
new DefaultChannelHandlerContext(this, group, head, nextCtx, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addFirst0(name, nextCtx, newCtx);
@ -143,7 +143,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public ChannelPipeline addLast(EventExecutor executor, final String name, final ChannelHandler handler) {
public ChannelPipeline addLast(EventExecutorGroup group, final String name, final ChannelHandler handler) {
try {
Future<Throwable> future;
@ -152,7 +152,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
final DefaultChannelHandlerContext oldTail = tail;
final DefaultChannelHandlerContext newTail =
new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler);
new DefaultChannelHandlerContext(this, group, oldTail, null, name, handler);
if (!newTail.channel().isRegistered() || newTail.executor().inEventLoop()) {
addLast0(name, oldTail, newTail);
@ -203,7 +203,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addBefore(
EventExecutor executor, String baseName, final String name, final ChannelHandler handler) {
EventExecutorGroup group, String baseName, final String name, final ChannelHandler handler) {
try {
Future<Throwable> future;
@ -211,7 +211,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
final DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name);
final DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler);
new DefaultChannelHandlerContext(this, group, ctx.prev, ctx, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addBefore0(name, ctx, newCtx);
@ -262,7 +262,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addAfter(
EventExecutor executor, String baseName, final String name, final ChannelHandler handler) {
EventExecutorGroup group, String baseName, final String name, final ChannelHandler handler) {
try {
Future<Throwable> future;
@ -274,7 +274,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
checkDuplicateName(name);
final DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, executor, ctx, ctx.next, name, handler);
new DefaultChannelHandlerContext(this, group, ctx, ctx.next, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addAfter0(name, ctx, newCtx);
@ -325,7 +325,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public ChannelPipeline addFirst(EventExecutor executor, ChannelHandler... handlers) {
public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
@ -354,7 +354,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public ChannelPipeline addLast(EventExecutor executor, ChannelHandler... handlers) {
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}

View File

@ -17,18 +17,33 @@ package io.netty.channel;
import java.util.concurrent.ThreadFactory;
public class DefaultEventExecutor extends MultithreadEventExecutor {
class DefaultEventExecutor extends SingleThreadEventExecutor {
public DefaultEventExecutor(int nThreads) {
this(nThreads, null);
}
public DefaultEventExecutor(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
DefaultEventExecutor(DefaultEventExecutorGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new DefaultChildEventExecutor(threadFactory);
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
break;
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}
}

View File

@ -17,33 +17,18 @@ package io.netty.channel;
import java.util.concurrent.ThreadFactory;
class DefaultChildEventExecutor extends SingleThreadEventExecutor {
public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
DefaultChildEventExecutor(ThreadFactory threadFactory) {
super(threadFactory);
public DefaultEventExecutorGroup(int nThreads) {
this(nThreads, null);
}
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
break;
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new DefaultEventExecutor(this, threadFactory);
}
}

View File

@ -17,12 +17,8 @@ package io.netty.channel;
import java.util.concurrent.ScheduledExecutorService;
public interface EventExecutor extends ScheduledExecutorService {
public interface EventExecutor extends EventExecutorGroup, ScheduledExecutorService {
EventExecutorGroup parent();
boolean inEventLoop();
boolean inEventLoop(Thread thread);
Unsafe unsafe();
interface Unsafe {
EventExecutor nextChild();
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public interface EventExecutorGroup {
/**
* Returns one of the {@link EventExecutor}s that belong to this group.
*/
EventExecutor next();
/**
* Shuts down all {@link EventExecutor}s managed by this group.
*
* @see ExecutorService#shutdown()
*/
void shutdown();
/**
* Returns {@code true} if and only if {@link #shutdown()} has been called.
*
* @see ExecutorService#isShutdown()
*/
boolean isShutdown();
/**
* Returns {@code true} if and only if {@link #shutdown()} has been called and all
* {@link EventExecutor}s managed by this group has been terminated completely.
*
* @see ExecutorService#isTerminated()
*/
boolean isTerminated();
/**
* Waits until {@link #isTerminated()} returns {@code true} or the specified amount of time
* passes.
*
* @see ExecutorService#awaitTermination(long, TimeUnit)
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
}

View File

@ -15,7 +15,7 @@
*/
package io.netty.channel;
public interface EventLoop extends EventExecutor {
ChannelFuture register(Channel channel);
ChannelFuture register(Channel channel, ChannelFuture future);
public interface EventLoop extends EventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
public interface EventLoopGroup extends EventExecutorGroup {
@Override
EventLoop next();
ChannelFuture register(Channel channel);
ChannelFuture register(Channel channel, ChannelFuture future);
}

View File

@ -15,33 +15,19 @@
*/
package io.netty.channel;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class MultithreadEventExecutor implements EventExecutor {
public abstract class MultithreadEventExecutorGroup implements EventExecutorGroup {
private static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final AtomicInteger poolId = new AtomicInteger();
private final EventExecutor[] children;
private final AtomicInteger childIndex = new AtomicInteger();
private final Unsafe unsafe = new Unsafe() {
@Override
public EventExecutor nextChild() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
};
protected MultithreadEventExecutor(int nThreads, ThreadFactory threadFactory, Object... args) {
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads < 0) {
throw new IllegalArgumentException(String.format(
"nThreads: %d (expected: >= 0)", nThreads));
@ -72,13 +58,13 @@ public abstract class MultithreadEventExecutor implements EventExecutor {
}
}
protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception;
@Override
public Unsafe unsafe() {
return unsafe;
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
protected abstract EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception;
@Override
public void shutdown() {
for (EventExecutor l: children) {
@ -86,14 +72,6 @@ public abstract class MultithreadEventExecutor implements EventExecutor {
}
}
@Override
public List<Runnable> shutdownNow() {
for (EventExecutor l: children) {
l.shutdownNow();
}
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
for (EventExecutor l: children) {
@ -132,97 +110,12 @@ public abstract class MultithreadEventExecutor implements EventExecutor {
return isTerminated();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return currentEventLoop().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return currentEventLoop().submit(task, result);
}
@Override
public Future<?> submit(Runnable task) {
return currentEventLoop().submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return currentEventLoop().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return currentEventLoop().invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
currentEventLoop().execute(command);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
return currentEventLoop().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return currentEventLoop().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
@Override
public boolean inEventLoop() {
throw new UnsupportedOperationException();
}
@Override
public boolean inEventLoop(Thread thread) {
throw new UnsupportedOperationException();
}
private static EventExecutor currentEventLoop() {
EventExecutor loop = SingleThreadEventExecutor.currentEventLoop();
if (loop == null) {
throw new IllegalStateException("not called from an event loop thread");
}
return loop;
}
private final class DefaultThreadFactory implements ThreadFactory {
private final AtomicInteger nextId = new AtomicInteger();
private final String prefix;
DefaultThreadFactory() {
String typeName = MultithreadEventExecutor.this.getClass().getSimpleName();
String typeName = MultithreadEventExecutorGroup.this.getClass().getSimpleName();
typeName = "" + Character.toLowerCase(typeName.charAt(0)) + typeName.substring(1);
prefix = typeName + '-' + poolId.incrementAndGet() + '-';
}

View File

@ -17,20 +17,25 @@ package io.netty.channel;
import java.util.concurrent.ThreadFactory;
public abstract class MultithreadEventLoop extends MultithreadEventExecutor implements EventLoop {
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
protected MultithreadEventLoop(int nThreads, ThreadFactory threadFactory,
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory,
Object... args) {
super(nThreads, threadFactory, args);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public ChannelFuture register(Channel channel) {
return ((EventLoop) unsafe().nextChild()).register(channel);
return next().register(channel);
}
@Override
public ChannelFuture register(Channel channel, ChannelFuture future) {
return ((EventLoop) unsafe().nextChild()).register(channel, future);
return next().register(channel, future);
}
}

View File

@ -64,13 +64,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
return nanoTime() + delay;
}
private final Unsafe unsafe = new Unsafe() {
@Override
public EventExecutor nextChild() {
return SingleThreadEventExecutor.this;
}
};
private final EventExecutorGroup parent;
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
private final Thread thread;
private final Object stateLock = new Object();
@ -83,7 +77,13 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
private long lastCheckTimeNanos;
private long lastPurgeTimeNanos;
protected SingleThreadEventExecutor(ThreadFactory threadFactory) {
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
@ -127,8 +127,13 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
}
@Override
public Unsafe unsafe() {
return unsafe;
public EventExecutorGroup parent() {
return parent;
}
@Override
public EventExecutor next() {
return this;
}
protected void interruptThread() {

View File

@ -19,8 +19,18 @@ import java.util.concurrent.ThreadFactory;
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected SingleThreadEventLoop(ThreadFactory threadFactory) {
super(threadFactory);
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
@Override
public EventLoopGroup parent() {
return (EventLoopGroup) super.parent();
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override

View File

@ -17,8 +17,8 @@ package io.netty.channel.embedded;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import java.util.Collections;
import java.util.List;
@ -27,8 +27,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
class EmbeddedEventLoop extends AbstractExecutorService implements
EventLoop, EventExecutor.Unsafe {
class EmbeddedEventLoop extends AbstractExecutorService implements EventLoop {
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
@ -108,12 +107,12 @@ class EmbeddedEventLoop extends AbstractExecutorService implements
}
@Override
public Unsafe unsafe() {
public EventLoop next() {
return this;
}
@Override
public EventExecutor nextChild() {
public EventLoopGroup parent() {
return this;
}
}

View File

@ -1,55 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.SingleThreadEventLoop;
import java.util.concurrent.ThreadFactory;
final class LocalChildEventLoop extends SingleThreadEventLoop {
LocalChildEventLoop(ThreadFactory threadFactory) {
super(threadFactory);
}
@Override
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
if (isShutdown()) {
task = pollTask();
if (task == null) {
break;
}
task.run();
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}
}

View File

@ -15,27 +15,41 @@
*/
package io.netty.channel.local;
import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoop;
import io.netty.channel.SingleThreadEventLoop;
import java.util.concurrent.ThreadFactory;
public class LocalEventLoop extends MultithreadEventLoop {
final class LocalEventLoop extends SingleThreadEventLoop {
public LocalEventLoop() {
this(0);
}
public LocalEventLoop(int nThreads) {
this(nThreads, null);
}
public LocalEventLoop(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
LocalEventLoop(LocalEventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new LocalChildEventLoop(threadFactory);
protected void run() {
for (;;) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
if (isShutdown()) {
task = pollTask();
if (task == null) {
break;
}
task.run();
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoopGroup;
import java.util.concurrent.ThreadFactory;
public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
public LocalEventLoopGroup() {
this(0);
}
public LocalEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public LocalEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new LocalEventLoop(this, threadFactory);
}
}

View File

@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
abstract class AbstractAioChannel extends AbstractChannel {
protected final AioEventLoop eventLoop;
protected final AioEventLoopGroup group;
private final AsynchronousChannel ch;
/**
@ -41,10 +41,10 @@ abstract class AbstractAioChannel extends AbstractChannel {
protected ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException;
protected AbstractAioChannel(Channel parent, Integer id, AioEventLoop eventLoop, AsynchronousChannel ch) {
protected AbstractAioChannel(Channel parent, Integer id, AioEventLoopGroup group, AsynchronousChannel ch) {
super(parent, id);
this.ch = ch;
this.eventLoop = eventLoop;
this.group = group;
}
@Override
@ -68,10 +68,10 @@ abstract class AbstractAioChannel extends AbstractChannel {
@Override
protected Runnable doRegister() throws Exception {
if (((AioChildEventLoop) eventLoop()).parent != eventLoop) {
if (((AioChildEventLoop) eventLoop()).parent() != group) {
throw new ChannelException(
getClass().getSimpleName() + " must be registered to the " +
AioEventLoop.class.getSimpleName() + " which was specified in the constructor.");
AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor.");
}
return null;
}

View File

@ -21,11 +21,8 @@ import java.util.concurrent.ThreadFactory;
final class AioChildEventLoop extends SingleThreadEventLoop {
final AioEventLoop parent;
AioChildEventLoop(AioEventLoop parent, ThreadFactory threadFactory) {
super(threadFactory);
this.parent = parent;
AioChildEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory);
}
@Override

View File

@ -17,49 +17,48 @@ package io.netty.channel.socket.aio;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoopException;
import io.netty.channel.MultithreadEventLoop;
import io.netty.channel.MultithreadEventLoopGroup;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class AioEventLoop extends MultithreadEventLoop {
public class AioEventLoopGroup extends MultithreadEventLoopGroup {
private static final ConcurrentMap<Class<?>, Field[]> fieldCache = new ConcurrentHashMap<Class<?>, Field[]>();
private static final Field[] FAILURE = new Field[0];
final AsynchronousChannelGroup group;
public AioEventLoop() {
public AioEventLoopGroup() {
this(0);
}
public AioEventLoop(int nThreads) {
public AioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public AioEventLoop(int nThreads, ThreadFactory threadFactory) {
public AioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
try {
group = AsynchronousChannelGroup.withThreadPool(this);
group = AsynchronousChannelGroup.withThreadPool(new AioExecutorService());
} catch (IOException e) {
throw new EventLoopException("Failed to create an AsynchronousChannelGroup", e);
}
}
@Override
public void execute(Runnable command) {
Class<? extends Runnable> commandType = command.getClass();
if (commandType.getName().startsWith("sun.nio.ch.")) {
executeAioTask(command);
} else {
super.execute(command);
}
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new AioChildEventLoop(this, threadFactory);
}
private void executeAioTask(Runnable command) {
@ -74,7 +73,7 @@ public class AioEventLoop extends MultithreadEventLoop {
if (ch != null) {
l = ch.eventLoop();
} else {
l = unsafe().nextChild();
l = next();
}
if (l.isShutdown()) {
@ -146,8 +145,42 @@ public class AioEventLoop extends MultithreadEventLoop {
return null;
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new AioChildEventLoop(this, threadFactory);
private final class AioExecutorService extends AbstractExecutorService {
@Override
public void shutdown() {
AioEventLoopGroup.this.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
AioEventLoopGroup.this.shutdown();
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return AioEventLoopGroup.this.isShutdown();
}
@Override
public boolean isTerminated() {
return AioEventLoopGroup.this.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return AioEventLoopGroup.this.awaitTermination(timeout, unit);
}
@Override
public void execute(Runnable command) {
Class<? extends Runnable> commandType = command.getClass();
if (commandType.getName().startsWith("sun.nio.ch.")) {
executeAioTask(command);
} else {
next().execute(command);
}
}
}
}

View File

@ -60,7 +60,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
}
}
public AioServerSocketChannel(AioEventLoop eventLoop) {
public AioServerSocketChannel(AioEventLoopGroup eventLoop) {
super(null, null, eventLoop, newSocket(eventLoop.group));
config = new AioServerSocketChannelConfig(javaChannel());
}
@ -147,7 +147,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
// create the socket add it to the buffer and fire the event
channel.pipeline().inboundMessageBuffer().add(
new AioSocketChannel(channel, null, channel.eventLoop, ch));
new AioSocketChannel(channel, null, channel.group, ch));
if (!channel.readSuspended.get()) {
channel.pipeline().fireInboundBufferUpdated();
}

View File

@ -63,13 +63,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
};
public AioSocketChannel(AioEventLoop eventLoop) {
public AioSocketChannel(AioEventLoopGroup eventLoop) {
this(null, null, eventLoop, newSocket(eventLoop.group));
}
AioSocketChannel(
AioServerSocketChannel parent, Integer id,
AioEventLoop eventLoop, AsynchronousSocketChannel ch) {
AioEventLoopGroup eventLoop, AsynchronousSocketChannel ch) {
super(parent, id, eventLoop, ch);
config = new AioSocketChannelConfig(ch);
}
@ -375,7 +375,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (eventLoop().inEventLoop()) {
beginRead();
} else {
eventLoop.execute(readTask);
eventLoop().execute(readTask);
}
}
}

View File

@ -191,7 +191,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioChildEventLoop;
return loop instanceof NioEventLoop;
}
@Override
@ -202,7 +202,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected Runnable doRegister() throws Exception {
NioChildEventLoop loop = (NioChildEventLoop) eventLoop();
NioEventLoop loop = (NioEventLoop) eventLoop();
selectionKey = javaChannel().register(
loop.selector, isActive()? defaultInterestOps : 0, this);
return null;
@ -210,7 +210,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doDeregister() throws Exception {
((NioChildEventLoop) eventLoop()).cancel(selectionKey());
((NioEventLoop) eventLoop()).cancel(selectionKey());
}
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;

View File

@ -1,235 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
final class NioChildEventLoop extends SingleThreadEventLoop {
/**
* Internal Netty logger.
*/
protected static final InternalLogger logger = InternalLoggerFactory
.getInstance(NioChildEventLoop.class);
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
/**
* The NIO {@link Selector}.
*/
protected final Selector selector;
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeone for
* the select method and the select method will block for that time unless
* waken up.
*/
protected final AtomicBoolean wakenUp = new AtomicBoolean();
private int cancelledKeys;
private boolean cleanedCancelledKeys;
NioChildEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(threadFactory);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
selector = openSelector(selectorProvider);
}
private static Selector openSelector(SelectorProvider provider) {
try {
return provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
}
@Override
protected void run() {
Selector selector = this.selector;
for (;;) {
wakenUp.set(false);
try {
SelectorUtil.select(selector);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
cancelledKeys = 0;
runAllTasks();
processSelectedKeys();
if (isShutdown()) {
closeAll();
if (peekTask() == null) {
break;
}
}
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
@Override
protected void cleanup() {
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
}
}
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
cleanedCancelledKeys = true;
SelectorUtil.cleanupKeys(selector);
}
}
private void processSelectedKeys() {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i;
cleanedCancelledKeys = false;
boolean clearSelectedKeys = true;
try {
for (i = selectedKeys.iterator(); i.hasNext();) {
final SelectionKey k = i.next();
final AbstractNioChannel ch = (AbstractNioChannel) k.attachment();
final NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
unsafe.flushNow();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidFuture());
}
if (cleanedCancelledKeys) {
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
clearSelectedKeys = false;
break;
} else {
i = selectedKeys.iterator();
}
}
}
} finally {
if (clearSelectedKeys) {
selectedKeys.clear();
}
}
}
private void closeAll() {
SelectorUtil.cleanupKeys(selector);
Set<SelectionKey> keys = selector.keys();
Collection<Channel> channels = new ArrayList<Channel>(keys.size());
for (SelectionKey k: keys) {
channels.add((Channel) k.attachment());
}
for (Channel ch: channels) {
ch.unsafe().close(ch.unsafe().voidFuture());
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}

View File

@ -15,38 +15,221 @@
*/
package io.netty.channel.socket.nio;
import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoop;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
public class NioEventLoop extends MultithreadEventLoop {
final class NioEventLoop extends SingleThreadEventLoop {
public NioEventLoop() {
this(0);
/**
* Internal Netty logger.
*/
protected static final InternalLogger logger = InternalLoggerFactory
.getInstance(NioEventLoop.class);
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
/**
* The NIO {@link Selector}.
*/
protected final Selector selector;
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeone for
* the select method and the select method will block for that time unless
* waken up.
*/
protected final AtomicBoolean wakenUp = new AtomicBoolean();
private int cancelledKeys;
private boolean cleanedCancelledKeys;
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
selector = openSelector(selectorProvider);
}
public NioEventLoop(int nThreads) {
this(nThreads, null);
}
public NioEventLoop(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
public NioEventLoop(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
private static Selector openSelector(SelectorProvider provider) {
try {
return provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
SelectorProvider selectorProvider;
if (args == null || args.length == 0 || args[0] == null) {
selectorProvider = SelectorProvider.provider();
} else {
selectorProvider = (SelectorProvider) args[0];
protected void run() {
Selector selector = this.selector;
for (;;) {
wakenUp.set(false);
try {
SelectorUtil.select(selector);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
cancelledKeys = 0;
runAllTasks();
processSelectedKeys();
if (isShutdown()) {
closeAll();
if (peekTask() == null) {
break;
}
}
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
@Override
protected void cleanup() {
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
}
}
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
cleanedCancelledKeys = true;
SelectorUtil.cleanupKeys(selector);
}
}
private void processSelectedKeys() {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i;
cleanedCancelledKeys = false;
boolean clearSelectedKeys = true;
try {
for (i = selectedKeys.iterator(); i.hasNext();) {
final SelectionKey k = i.next();
final AbstractNioChannel ch = (AbstractNioChannel) k.attachment();
final NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
unsafe.flushNow();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidFuture());
}
if (cleanedCancelledKeys) {
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
clearSelectedKeys = false;
break;
} else {
i = selectedKeys.iterator();
}
}
}
} finally {
if (clearSelectedKeys) {
selectedKeys.clear();
}
}
}
private void closeAll() {
SelectorUtil.cleanupKeys(selector);
Set<SelectionKey> keys = selector.keys();
Collection<Channel> channels = new ArrayList<Channel>(keys.size());
for (SelectionKey k: keys) {
channels.add((Channel) k.attachment());
}
for (Channel ch: channels) {
ch.unsafe().close(ch.unsafe().voidFuture());
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
return new NioChildEventLoop(threadFactory, selectorProvider);
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoopGroup;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ThreadFactory;
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
SelectorProvider selectorProvider;
if (args == null || args.length == 0 || args[0] == null) {
selectorProvider = SelectorProvider.provider();
} else {
selectorProvider = (SelectorProvider) args[0];
}
return new NioEventLoop(this, threadFactory, selectorProvider);
}
}

View File

@ -86,7 +86,7 @@ abstract class AbstractOioChannel extends AbstractChannel {
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof OioChildEventLoop;
return loop instanceof OioEventLoop;
}
@Override

View File

@ -1,106 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.SingleThreadEventLoop;
class OioChildEventLoop extends SingleThreadEventLoop {
private final OioEventLoop parent;
private AbstractOioChannel ch;
OioChildEventLoop(OioEventLoop parent) {
super(parent.threadFactory);
this.parent = parent;
}
@Override
public ChannelFuture register(Channel channel, ChannelFuture future) {
return super.register(channel, future).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = (AbstractOioChannel) future.channel();
} else {
deregister();
}
}
});
}
@Override
protected void run() {
for (;;) {
AbstractOioChannel ch = this.ch;
if (ch == null || !ch.isActive()) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
} else {
long startTime = System.nanoTime();
for (;;) {
final Runnable task = pollTask();
if (task == null) {
break;
}
task.run();
// Ensure running tasks doesn't take too much time.
if (System.nanoTime() - startTime > AbstractOioChannel.SO_TIMEOUT * 1000000L) {
break;
}
}
ch.unsafe().read();
// Handle deregistration
if (!ch.isRegistered()) {
runAllTasks();
deregister();
}
}
if (isShutdown()) {
if (ch != null) {
ch.unsafe().close(ch.unsafe().voidFuture());
}
if (peekTask() == null) {
break;
}
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
interruptThread();
}
private void deregister() {
ch = null;
parent.activeChildren.remove(this);
parent.idleChildren.add(this);
}
}

View File

@ -15,273 +15,92 @@
*/
package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventExecutor;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.SingleThreadEventLoop;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class OioEventLoop implements EventLoop {
class OioEventLoop extends SingleThreadEventLoop {
private final int maxChannels;
final ThreadFactory threadFactory;
final Set<OioChildEventLoop> activeChildren = Collections.newSetFromMap(
new ConcurrentHashMap<OioChildEventLoop, Boolean>());
final Queue<OioChildEventLoop> idleChildren = new ConcurrentLinkedQueue<OioChildEventLoop>();
private final ChannelException tooManyChannels;
private final Unsafe unsafe = new Unsafe() {
@Override
public EventExecutor nextChild() {
throw new UnsupportedOperationException();
}
};
private final OioEventLoopGroup parent;
private AbstractOioChannel ch;
public OioEventLoop() {
this(0);
}
public OioEventLoop(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
}
public OioEventLoop(int maxChannels, ThreadFactory threadFactory) {
if (maxChannels < 0) {
throw new IllegalArgumentException(String.format(
"maxChannels: %d (expected: >= 0)", maxChannels));
}
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.maxChannels = maxChannels;
this.threadFactory = threadFactory;
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
tooManyChannels.setStackTrace(new StackTraceElement[0]);
}
@Override
public Unsafe unsafe() {
return unsafe;
}
@Override
public void shutdown() {
for (EventLoop l: activeChildren) {
l.shutdown();
}
for (EventLoop l: idleChildren) {
l.shutdown();
}
}
@Override
public List<Runnable> shutdownNow() {
for (EventLoop l: activeChildren) {
l.shutdownNow();
}
for (EventLoop l: idleChildren) {
l.shutdownNow();
}
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
for (EventLoop l: activeChildren) {
if (!l.isShutdown()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
for (EventLoop l: activeChildren) {
if (!l.isTerminated()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
for (EventLoop l: activeChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
for (EventLoop l: idleChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return currentEventLoop().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return currentEventLoop().submit(task, result);
}
@Override
public Future<?> submit(Runnable task) {
return currentEventLoop().submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return currentEventLoop().invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return currentEventLoop().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return currentEventLoop().invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
currentEventLoop().execute(command);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
return currentEventLoop().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return currentEventLoop().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
@Override
public ChannelFuture register(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
return nextChild().register(channel);
} catch (Throwable t) {
return channel.newFailedFuture(t);
}
OioEventLoop(OioEventLoopGroup parent) {
super(parent, parent.threadFactory);
this.parent = parent;
}
@Override
public ChannelFuture register(Channel channel, ChannelFuture future) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
return nextChild().register(channel, future);
} catch (Throwable t) {
return channel.newFailedFuture(t);
}
}
@Override
public boolean inEventLoop() {
return SingleThreadEventExecutor.currentEventLoop() != null;
}
@Override
public boolean inEventLoop(Thread thread) {
throw new UnsupportedOperationException();
}
private EventLoop nextChild() {
OioChildEventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
return super.register(channel, future).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ch = (AbstractOioChannel) future.channel();
} else {
deregister();
}
}
loop = new OioChildEventLoop(this);
}
activeChildren.add(loop);
return loop;
});
}
private static OioChildEventLoop currentEventLoop() {
OioChildEventLoop loop =
(OioChildEventLoop) SingleThreadEventExecutor.currentEventLoop();
if (loop == null) {
throw new IllegalStateException("not called from an event loop thread");
@Override
protected void run() {
for (;;) {
AbstractOioChannel ch = this.ch;
if (ch == null || !ch.isActive()) {
Runnable task;
try {
task = takeTask();
task.run();
} catch (InterruptedException e) {
// Waken up by interruptThread()
}
} else {
long startTime = System.nanoTime();
for (;;) {
final Runnable task = pollTask();
if (task == null) {
break;
}
task.run();
// Ensure running tasks doesn't take too much time.
if (System.nanoTime() - startTime > AbstractOioChannel.SO_TIMEOUT * 1000000L) {
break;
}
}
ch.unsafe().read();
// Handle deregistration
if (!ch.isRegistered()) {
runAllTasks();
deregister();
}
}
if (isShutdown()) {
if (ch != null) {
ch.unsafe().close(ch.unsafe().voidFuture());
}
if (peekTask() == null) {
break;
}
}
}
return loop;
}
@Override
protected void wakeup(boolean inEventLoop) {
interruptThread();
}
private void deregister() {
ch = null;
parent.activeChildren.remove(this);
parent.idleChildren.add(this);
}
}

View File

@ -0,0 +1,176 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import java.util.Collections;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class OioEventLoopGroup implements EventLoopGroup {
private final int maxChannels;
final ThreadFactory threadFactory;
final Set<OioEventLoop> activeChildren = Collections.newSetFromMap(
new ConcurrentHashMap<OioEventLoop, Boolean>());
final Queue<OioEventLoop> idleChildren = new ConcurrentLinkedQueue<OioEventLoop>();
private final ChannelException tooManyChannels;
public OioEventLoopGroup() {
this(0);
}
public OioEventLoopGroup(int maxChannels) {
this(maxChannels, Executors.defaultThreadFactory());
}
public OioEventLoopGroup(int maxChannels, ThreadFactory threadFactory) {
if (maxChannels < 0) {
throw new IllegalArgumentException(String.format(
"maxChannels: %d (expected: >= 0)", maxChannels));
}
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.maxChannels = maxChannels;
this.threadFactory = threadFactory;
tooManyChannels = new ChannelException("too many channels (max: " + maxChannels + ')');
tooManyChannels.setStackTrace(new StackTraceElement[0]);
}
@Override
public EventLoop next() {
throw new UnsupportedOperationException();
}
@Override
public void shutdown() {
for (EventLoop l: activeChildren) {
l.shutdown();
}
for (EventLoop l: idleChildren) {
l.shutdown();
}
}
@Override
public boolean isShutdown() {
for (EventLoop l: activeChildren) {
if (!l.isShutdown()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
for (EventLoop l: activeChildren) {
if (!l.isTerminated()) {
return false;
}
}
for (EventLoop l: idleChildren) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
for (EventLoop l: activeChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
for (EventLoop l: idleChildren) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
return isTerminated();
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
@Override
public ChannelFuture register(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
return nextChild().register(channel);
} catch (Throwable t) {
return channel.newFailedFuture(t);
}
}
@Override
public ChannelFuture register(Channel channel, ChannelFuture future) {
if (channel == null) {
throw new NullPointerException("channel");
}
try {
return nextChild().register(channel, future);
} catch (Throwable t) {
return channel.newFailedFuture(t);
}
}
private EventLoop nextChild() {
OioEventLoop loop = idleChildren.poll();
if (loop == null) {
if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
throw tooManyChannels;
}
loop = new OioEventLoop(this);
}
activeChildren.add(loop);
return loop;
}
}

View File

@ -256,7 +256,7 @@ public class SingleThreadEventLoopTest {
final AtomicInteger cleanedUp = new AtomicInteger();
SingleThreadEventLoopImpl() {
super(Executors.defaultThreadFactory());
super(null, Executors.defaultThreadFactory());
}
@Override

View File

@ -42,12 +42,12 @@ public class LocalChannelRegistryTest {
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
cb.eventLoop(new LocalEventLoop())
cb.group(new LocalEventLoopGroup())
.channel(new LocalChannel())
.remoteAddress(addr)
.handler(new TestHandler());
sb.eventLoop(new LocalEventLoop(), new LocalEventLoop())
sb.group(new LocalEventLoopGroup(), new LocalEventLoopGroup())
.channel(new LocalServerChannel())
.localAddress(addr)
.childHandler(new ChannelInitializer<LocalChannel>() {

View File

@ -29,9 +29,9 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.DefaultEventExecutor;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoop;
import io.netty.channel.DefaultEventExecutorGroup;
import io.netty.channel.EventExecutorGroup;
import io.netty.channel.EventLoopGroup;
import java.util.HashSet;
import java.util.Queue;
@ -57,7 +57,7 @@ public class LocalTransportThreadModelTest {
public static void init() {
// Configure a test server
sb = new ServerBootstrap();
sb.eventLoop(new LocalEventLoop(), new LocalEventLoop())
sb.group(new LocalEventLoopGroup(), new LocalEventLoopGroup())
.channel(new LocalServerChannel())
.localAddress(LocalAddress.ANY)
.childHandler(new ChannelInitializer<LocalChannel>() {
@ -89,9 +89,9 @@ public class LocalTransportThreadModelTest {
@Test(timeout = 5000)
public void testStagedExecution() throws Throwable {
EventLoop l = new LocalEventLoop(4, new PrefixThreadFactory("l"));
EventExecutor e1 = new DefaultEventExecutor(4, new PrefixThreadFactory("e1"));
EventExecutor e2 = new DefaultEventExecutor(4, new PrefixThreadFactory("e2"));
EventLoopGroup l = new LocalEventLoopGroup(4, new PrefixThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e2"));
ThreadNameAuditor h1 = new ThreadNameAuditor();
ThreadNameAuditor h2 = new ThreadNameAuditor();
ThreadNameAuditor h3 = new ThreadNameAuditor();
@ -206,12 +206,12 @@ public class LocalTransportThreadModelTest {
@Test(timeout = 60000)
public void testConcurrentMessageBufferAccess() throws Throwable {
EventLoop l = new LocalEventLoop(4, new PrefixThreadFactory("l"));
EventExecutor e1 = new DefaultEventExecutor(4, new PrefixThreadFactory("e1"));
EventExecutor e2 = new DefaultEventExecutor(4, new PrefixThreadFactory("e2"));
EventExecutor e3 = new DefaultEventExecutor(4, new PrefixThreadFactory("e3"));
EventExecutor e4 = new DefaultEventExecutor(4, new PrefixThreadFactory("e4"));
EventExecutor e5 = new DefaultEventExecutor(4, new PrefixThreadFactory("e5"));
EventLoopGroup l = new LocalEventLoopGroup(4, new PrefixThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e3"));
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e4"));
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e5"));
try {
final MessageForwarder1 h1 = new MessageForwarder1();