Add EventLoopFactory and make MultithreadEventLoop use it

- based on the feed back from @normanmaurer
This commit is contained in:
Trustin Lee 2012-05-11 21:47:07 +09:00
parent 97c07708a2
commit c57d7dd098
5 changed files with 32 additions and 16 deletions

View File

@ -26,8 +26,6 @@ import io.netty.channel.MultithreadEventLoop;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.nio.SelectorEventLoop; import io.netty.channel.socket.nio.SelectorEventLoop;
import io.netty.handler.logging.LoggingHandler;
import io.netty.logging.InternalLogLevel;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -52,10 +50,10 @@ public class EchoClient {
} }
public void run() throws Exception { public void run() throws Exception {
EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.class); EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.FACTORY);
SocketChannel s = new NioSocketChannel(); SocketChannel s = new NioSocketChannel();
s.config().setTcpNoDelay(true); s.config().setTcpNoDelay(true);
s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); //s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO));
s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter<Byte>() { s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter<Byte>() {
private final ChannelBuffer firstMessage; private final ChannelBuffer firstMessage;

View File

@ -49,7 +49,7 @@ public class EchoServer {
public void run() throws Exception { public void run() throws Exception {
// Configure the server. // Configure the server.
final EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.class); final EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.FACTORY);
ServerSocketChannel ssc = new NioServerSocketChannel(); ServerSocketChannel ssc = new NioServerSocketChannel();
ssc.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); ssc.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO));
ssc.pipeline().addLast("acceptor", new ChannelInboundHandlerAdapter<SocketChannel>() { ssc.pipeline().addLast("acceptor", new ChannelInboundHandlerAdapter<SocketChannel>() {
@ -72,7 +72,7 @@ public class EchoServer {
break; break;
} }
s.config().setTcpNoDelay(true); s.config().setTcpNoDelay(true);
s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); //s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO));
s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter<Byte>() { s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter<Byte>() {
@Override @Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelInboundHandlerContext<Byte> ctx) { public ChannelBufferHolder<Byte> newInboundBuffer(ChannelInboundHandlerContext<Byte> ctx) {

View File

@ -0,0 +1,7 @@
package io.netty.channel;
import java.util.concurrent.ThreadFactory;
public interface EventLoopFactory<T extends EventLoop> {
T newEventLoop(ThreadFactory threadFactory) throws Exception;
}

View File

@ -18,20 +18,21 @@ public class MultithreadEventLoop implements EventLoop {
private final EventLoop[] children; private final EventLoop[] children;
private final AtomicInteger childIndex = new AtomicInteger(); private final AtomicInteger childIndex = new AtomicInteger();
public MultithreadEventLoop(Class<? extends SingleThreadEventLoop> loopType) { public MultithreadEventLoop(EventLoopFactory<? extends SingleThreadEventLoop> loopFactory) {
this(loopType, Runtime.getRuntime().availableProcessors() * 2); this(loopFactory, Runtime.getRuntime().availableProcessors() * 2);
} }
public MultithreadEventLoop(Class<? extends SingleThreadEventLoop> loopType, int nThreads) { public MultithreadEventLoop(EventLoopFactory<? extends SingleThreadEventLoop> loopFactory, int nThreads) {
this(loopType, nThreads, Executors.defaultThreadFactory()); this(loopFactory, nThreads, Executors.defaultThreadFactory());
} }
public MultithreadEventLoop(Class<? extends SingleThreadEventLoop> loopType, int nThreads, ThreadFactory threadFactory) { public MultithreadEventLoop(EventLoopFactory<? extends SingleThreadEventLoop> loopFactory, int nThreads, ThreadFactory threadFactory) {
if (loopType == null) { if (loopFactory == null) {
throw new NullPointerException("loopType"); throw new NullPointerException("loopFactory");
} }
if (nThreads <= 0) { if (nThreads <= 0) {
throw new IllegalArgumentException("nThreads: " + nThreads + " (expected: > 0)"); throw new IllegalArgumentException(String.format(
"nThreads: %d (expected: > 0)", nThreads));
} }
if (threadFactory == null) { if (threadFactory == null) {
throw new NullPointerException("threadFactory"); throw new NullPointerException("threadFactory");
@ -41,10 +42,10 @@ public class MultithreadEventLoop implements EventLoop {
for (int i = 0; i < nThreads; i ++) { for (int i = 0; i < nThreads; i ++) {
boolean success = false; boolean success = false;
try { try {
children[i] = loopType.getConstructor(ThreadFactory.class).newInstance(threadFactory); children[i] = loopFactory.newEventLoop(threadFactory);
success = true; success = true;
} catch (Exception e) { } catch (Exception e) {
throw new EventLoopException("failed to create a child event loop: " + loopType.getName(), e); throw new EventLoopException("failed to create a child event loop", e);
} finally { } finally {
if (!success) { if (!success) {
for (int j = 0; j < i; j ++) { for (int j = 0; j < i; j ++) {

View File

@ -17,6 +17,7 @@ package io.netty.channel.socket.nio;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.EventLoopFactory;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
@ -35,6 +36,15 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class SelectorEventLoop extends SingleThreadEventLoop { public class SelectorEventLoop extends SingleThreadEventLoop {
public static final EventLoopFactory<SelectorEventLoop> FACTORY = new EventLoopFactory<SelectorEventLoop>() {
@Override
public SelectorEventLoop newEventLoop(ThreadFactory threadFactory)
throws Exception {
return new SelectorEventLoop(threadFactory);
}
};
/** /**
* Internal Netty logger. * Internal Netty logger.
*/ */