diff --git a/example/src/main/java/io/netty/example/echo/EchoClient.java b/example/src/main/java/io/netty/example/echo/EchoClient.java index cb88e01015..b4174e4be1 100644 --- a/example/src/main/java/io/netty/example/echo/EchoClient.java +++ b/example/src/main/java/io/netty/example/echo/EchoClient.java @@ -22,11 +22,10 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; -import io.netty.channel.MultithreadEventLoop; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.SelectorEventLoop; +import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.logging.InternalLogLevel; import java.net.InetSocketAddress; @@ -50,7 +49,7 @@ public class EchoClient { public void run() throws Exception { // Create the required event loop. - EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.FACTORY); + EventLoop loop = new SelectorEventLoop(); try { // Configure the client. ChannelBuilder b = new ChannelBuilder(); @@ -62,7 +61,7 @@ public class EchoClient { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline p = ch.pipeline(); - p.addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); + p.addLast("logger", new LoggingHandler(LogLevel.INFO)); p.addLast("echoer", new EchoClientHandler(firstMessageSize)); } }); diff --git a/example/src/main/java/io/netty/example/echo/EchoServer.java b/example/src/main/java/io/netty/example/echo/EchoServer.java index 9915d52339..88297c82cb 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServer.java +++ b/example/src/main/java/io/netty/example/echo/EchoServer.java @@ -21,12 +21,11 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; -import io.netty.channel.MultithreadEventLoop; import io.netty.channel.ServerChannelBuilder; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.SelectorEventLoop; +import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.logging.InternalLogLevel; import java.net.InetSocketAddress; @@ -43,21 +42,22 @@ public class EchoServer { public void run() throws Exception { // Create the required event loops. - EventLoop parentLoop = new MultithreadEventLoop(SelectorEventLoop.FACTORY); - EventLoop childLoop = new MultithreadEventLoop(SelectorEventLoop.FACTORY); + EventLoop parentLoop = new SelectorEventLoop(); + EventLoop childLoop = new SelectorEventLoop(); try { // Configure the server. ServerChannelBuilder b = new ServerChannelBuilder(); b.parentEventLoop(parentLoop) - .childEventLoop(childLoop) .parentChannel(new NioServerSocketChannel()) - .childOption(ChannelOption.TCP_NODELAY, true) + .parentOption(ChannelOption.SO_BACKLOG, 24) .localAddress(new InetSocketAddress(port)) + .childEventLoop(childLoop) + .childOption(ChannelOption.TCP_NODELAY, true) .childInitializer(new ChannelInitializer() { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline p = ch.pipeline(); - p.addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); + p.addLast("logger", new LoggingHandler(LogLevel.INFO)); p.addLast("echoer", new EchoServerHandler()); } }); diff --git a/handler/src/main/java/io/netty/handler/logging/LogLevel.java b/handler/src/main/java/io/netty/handler/logging/LogLevel.java new file mode 100644 index 0000000000..82423b7bd6 --- /dev/null +++ b/handler/src/main/java/io/netty/handler/logging/LogLevel.java @@ -0,0 +1,20 @@ +package io.netty.handler.logging; + +import io.netty.logging.InternalLogLevel; + +public enum LogLevel { + DEBUG(InternalLogLevel.DEBUG), + INFO(InternalLogLevel.INFO), + WARN(InternalLogLevel.WARN), + ERROR(InternalLogLevel.ERROR); + + private final InternalLogLevel internalLevel; + + LogLevel(InternalLogLevel internalLevel) { + this.internalLevel = internalLevel; + } + + InternalLogLevel toInternalLevel() { + return internalLevel; + } +} diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index f8ffa7a0b4..a25db516c8 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -42,7 +42,7 @@ import java.util.Queue; @Sharable public class LoggingHandler extends ChannelHandlerAdapter { - private static final InternalLogLevel DEFAULT_LEVEL = InternalLogLevel.DEBUG; + private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG; private static final String NEWLINE = String.format("%n"); private static final String[] BYTE2HEX = new String[256]; @@ -104,7 +104,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { } private final InternalLogger logger; - private final InternalLogLevel level; + private final LogLevel level; + private final InternalLogLevel internalLevel; /** * Creates a new instance whose logger name is the fully qualified class @@ -120,13 +121,14 @@ public class LoggingHandler extends ChannelHandlerAdapter { * * @param level the log level */ - public LoggingHandler(InternalLogLevel level) { + public LoggingHandler(LogLevel level) { if (level == null) { throw new NullPointerException("level"); } logger = InternalLoggerFactory.getInstance(getClass()); this.level = level; + internalLevel = level.toInternalLevel(); } /** @@ -142,7 +144,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { * * @param level the log level */ - public LoggingHandler(Class clazz, InternalLogLevel level) { + public LoggingHandler(Class clazz, LogLevel level) { if (clazz == null) { throw new NullPointerException("clazz"); } @@ -151,34 +153,22 @@ public class LoggingHandler extends ChannelHandlerAdapter { } logger = InternalLoggerFactory.getInstance(clazz); this.level = level; - } - - /** - * Creates a new instance with the specified logger name and with hex dump - * enabled. - */ - public LoggingHandler(String name) { - this(name, true); + internalLevel = level.toInternalLevel(); } /** * Creates a new instance with the specified logger name. - * - * @param hexDump {@code true} if and only if the hex dump of the received - * message is logged */ - public LoggingHandler(String name, boolean hexDump) { - this(name, DEFAULT_LEVEL, hexDump); + public LoggingHandler(String name) { + this(name, DEFAULT_LEVEL); } /** * Creates a new instance with the specified logger name. * * @param level the log level - * @param hexDump {@code true} if and only if the hex dump of the received - * message is logged */ - public LoggingHandler(String name, InternalLogLevel level, boolean hexDump) { + public LoggingHandler(String name, LogLevel level) { if (name == null) { throw new NullPointerException("name"); } @@ -187,6 +177,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { } logger = InternalLoggerFactory.getInstance(name); this.level = level; + internalLevel = level.toInternalLevel(); } /** @@ -201,7 +192,7 @@ public class LoggingHandler extends ChannelHandlerAdapter { * Returns the {@link InternalLogLevel} that this handler uses to log * a {@link ChannelEvent}. */ - public InternalLogLevel getLevel() { + public LogLevel getLevel() { return level; } @@ -306,8 +297,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void channelRegistered(ChannelInboundHandlerContext ctx) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "REGISTERED")); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "REGISTERED")); } super.channelRegistered(ctx); } @@ -315,8 +306,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void channelUnregistered(ChannelInboundHandlerContext ctx) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "UNREGISTERED")); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "UNREGISTERED")); } super.channelUnregistered(ctx); } @@ -324,8 +315,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelInboundHandlerContext ctx) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "ACTIVE")); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "ACTIVE")); } super.channelActive(ctx); } @@ -333,8 +324,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "INACTIVE")); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "INACTIVE")); } super.channelInactive(ctx); } @@ -342,8 +333,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelInboundHandlerContext ctx, Throwable cause) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "EXCEPTION: " + cause), cause); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "EXCEPTION: " + cause), cause); } super.exceptionCaught(ctx, cause); } @@ -351,8 +342,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void userEventTriggered(ChannelInboundHandlerContext ctx, Object evt) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "USER_EVENT: " + evt)); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "USER_EVENT: " + evt)); } super.userEventTriggered(ctx, evt); } @@ -360,8 +351,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, formatBuffer("INBUF", ctx.in()))); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, formatBuffer("INBUF", ctx.in()))); } ctx.fireInboundBufferUpdated(); } @@ -369,8 +360,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void bind(ChannelOutboundHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "BIND(" + localAddress + ')')); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "BIND(" + localAddress + ')')); } super.bind(ctx, localAddress, future); } @@ -379,8 +370,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { public void connect(ChannelOutboundHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "CONNECT(" + remoteAddress + ", " + localAddress + ')')); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "CONNECT(" + remoteAddress + ", " + localAddress + ')')); } super.connect(ctx, remoteAddress, localAddress, future); } @@ -388,8 +379,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void disconnect(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "DISCONNECT()")); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "DISCONNECT()")); } super.disconnect(ctx, future); } @@ -397,8 +388,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void close(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "CLOSE()")); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "CLOSE()")); } super.close(ctx, future); } @@ -406,8 +397,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void deregister(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, "DEREGISTER()")); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "DEREGISTER()")); } super.deregister(ctx, future); } @@ -415,8 +406,8 @@ public class LoggingHandler extends ChannelHandlerAdapter { @Override public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { - if (getLogger().isEnabled(level)) { - logger.log(level, format(ctx, formatBuffer("OUTBUF", ctx.prevOut()))); + if (getLogger().isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, formatBuffer("OUTBUF", ctx.prevOut()))); } ctx.flush(future); } diff --git a/transport/src/main/java/io/netty/channel/ChannelInitializer.java b/transport/src/main/java/io/netty/channel/ChannelInitializer.java index f5aa605fe1..e73d2baf6b 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInitializer.java +++ b/transport/src/main/java/io/netty/channel/ChannelInitializer.java @@ -43,7 +43,6 @@ public abstract class ChannelInitializer extends ChannelInboundHandlerAdapter loopFactory) { - this(loopFactory, Runtime.getRuntime().availableProcessors() * 2); + this(loopFactory, DEFAULT_POOL_SIZE); } public MultithreadEventLoop(EventLoopFactory loopFactory, int nThreads) { - this(loopFactory, nThreads, Executors.defaultThreadFactory()); + this(loopFactory, nThreads, DEFAULT_THREAD_FACTORY); } public MultithreadEventLoop(EventLoopFactory loopFactory, int nThreads, ThreadFactory threadFactory) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index c47ff7403f..0c987bdb5e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -37,11 +37,6 @@ public abstract class AbstractNioChannel extends AbstractChannel { this.ch = ch; } - @Override - public SelectorEventLoop eventLoop() { - return (SelectorEventLoop) super.eventLoop(); - } - @Override protected SelectableChannel javaChannel() { return ch; @@ -84,12 +79,12 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected boolean isCompatible(EventLoop loop) { - return loop instanceof SelectorEventLoop; + return loop instanceof SingleThreadSelectorEventLoop; } @Override protected void doRegister() throws Exception { - SelectorEventLoop loop = eventLoop(); + SingleThreadSelectorEventLoop loop = (SingleThreadSelectorEventLoop) eventLoop(); selectionKey = javaChannel().register(loop.selector, isActive()? SelectionKey.OP_READ : 0, this); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java index 81e8bbfddd..63186549fd 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java @@ -43,7 +43,7 @@ import java.util.concurrent.Executor; * A class responsible for registering channels with {@link Selector}. * It also implements the {@link Selector} loop. */ -public class NioDatagramWorker extends SelectorEventLoop { +public class NioDatagramWorker extends SingleThreadSelectorEventLoop { /** * Sole constructor. diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 2b2ebffae3..68773515d9 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -74,11 +74,6 @@ public class NioServerSocketChannel extends AbstractServerChannel return config; } - @Override - public SelectorEventLoop eventLoop() { - return (SelectorEventLoop) super.eventLoop(); - } - @Override public boolean isActive() { return javaChannel().socket().isBound(); @@ -116,12 +111,12 @@ public class NioServerSocketChannel extends AbstractServerChannel @Override protected boolean isCompatible(EventLoop loop) { - return loop instanceof SelectorEventLoop; + return loop instanceof SingleThreadSelectorEventLoop; } @Override protected void doRegister() throws Exception { - SelectorEventLoop loop = eventLoop(); + SingleThreadSelectorEventLoop loop = (SingleThreadSelectorEventLoop) eventLoop(); selectionKey = javaChannel().register( loop.selector, isActive()? SelectionKey.OP_ACCEPT : 0, this); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index d1da449009..727ec63e39 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -152,7 +152,7 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha @Override protected void doDeregister() throws Exception { selectionKey().cancel(); - eventLoop().cancelledKeys ++; + ((SingleThreadSelectorEventLoop) eventLoop()).cancelledKeys ++; } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java index bad413caca..e15fe599e1 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java @@ -1,253 +1,32 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ package io.netty.channel.socket.nio; -import io.netty.channel.Channel; -import io.netty.channel.Channel.Unsafe; -import io.netty.channel.ChannelException; import io.netty.channel.EventLoopFactory; -import io.netty.channel.SingleThreadEventLoop; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; +import io.netty.channel.MultithreadEventLoop; -import java.io.IOException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicBoolean; -public class SelectorEventLoop extends SingleThreadEventLoop { - - public static final EventLoopFactory FACTORY = new EventLoopFactory() { - @Override - public SelectorEventLoop newEventLoop(ThreadFactory threadFactory) - throws Exception { - return new SelectorEventLoop(threadFactory); - } - }; - - /** - * Internal Netty logger. - */ - protected static final InternalLogger logger = InternalLoggerFactory - .getInstance(SelectorEventLoop.class); - - static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. - - /** - * The NIO {@link Selector}. - */ - protected final Selector selector; - - /** - * Boolean that controls determines if a blocked Selector.select should - * break out of its selection process. In our case we use a timeone for - * the select method and the select method will block for that time unless - * waken up. - */ - protected final AtomicBoolean wakenUp = new AtomicBoolean(); - - int cancelledKeys; +public class SelectorEventLoop extends MultithreadEventLoop { public SelectorEventLoop() { - this(Executors.defaultThreadFactory()); + this(DEFAULT_POOL_SIZE); } - public SelectorEventLoop(ThreadFactory threadFactory) { - this(threadFactory, SelectorProvider.provider()); + public SelectorEventLoop(int nThreads) { + this(nThreads, DEFAULT_THREAD_FACTORY); } - public SelectorEventLoop(SelectorProvider selectorProvider) { - this(Executors.defaultThreadFactory(), selectorProvider); + public SelectorEventLoop(int nThreads, ThreadFactory threadFactory) { + this(nThreads, threadFactory, SelectorProvider.provider()); } - public SelectorEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) { - super(threadFactory); - if (selectorProvider == null) { - throw new NullPointerException("selectorProvider"); - } - selector = openSelector(selectorProvider); - } - - private static Selector openSelector(SelectorProvider provider) { - try { - return provider.openSelector(); - } catch (IOException e) { - throw new ChannelException("failed to open a new selector", e); - } - } - - @Override - protected void run() { - Selector selector = this.selector; - for (;;) { - - wakenUp.set(false); - - try { - SelectorUtil.select(selector); - - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). - - if (wakenUp.get()) { - selector.wakeup(); - } - - cancelledKeys = 0; - processTaskQueue(); - processSelectedKeys(); - - if (isShutdown()) { - closeAll(); - break; - } - } catch (Throwable t) { - logger.warn( - "Unexpected exception in the selector loop.", t); - - // Prevent possible consecutive immediate failures that lead to - // excessive CPU consumption. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore. - } - } - } - } - - @Override - protected void cleanup() { - try { - selector.close(); - } catch (IOException e) { - logger.warn( - "Failed to close a selector.", e); - } - } - - private void processTaskQueue() { - for (;;) { - final Runnable task = pollTask(); - if (task == null) { - break; + public SelectorEventLoop(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { + super(new EventLoopFactory() { + @Override + public SingleThreadSelectorEventLoop newEventLoop(ThreadFactory threadFactory) throws Exception { + return new SingleThreadSelectorEventLoop(threadFactory, selectorProvider); } - task.run(); - cleanUpCancelledKeys(); - } - } - - private void processSelectedKeys() { - for (Iterator i = selector.selectedKeys().iterator(); i.hasNext();) { - final SelectionKey k = i.next(); - final Channel ch = (Channel) k.attachment(); - final Unsafe unsafe = ch.unsafe(); - boolean removeKey = true; - try { - int readyOps = k.readyOps(); - if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { - unsafe.read(); - if (!ch.isOpen()) { - // Connection already closed - no need to handle write. - continue; - } - } - if ((readyOps & SelectionKey.OP_WRITE) != 0) { - unsafe.flushForcibly(); - } - if ((readyOps & SelectionKey.OP_ACCEPT) != 0) { - unsafe.read(); - } - if ((readyOps & SelectionKey.OP_CONNECT) != 0) { - unsafe.finishConnect(); - } - } catch (CancelledKeyException ignored) { - unsafe.close(unsafe.voidFuture()); - } finally { - if (removeKey) { - i.remove(); - } - } - - if (cleanUpCancelledKeys()) { - break; // break the loop to avoid ConcurrentModificationException - } - } - } - - private boolean cleanUpCancelledKeys() { - if (cancelledKeys >= CLEANUP_INTERVAL) { - cancelledKeys = 0; - SelectorUtil.cleanupKeys(selector); - return true; - } - return false; - } - - private void closeAll() { - SelectorUtil.cleanupKeys(selector); - Set keys = selector.keys(); - Collection channels = new ArrayList(keys.size()); - for (SelectionKey k: keys) { - channels.add((Channel) k.attachment()); - } - - for (Channel ch: channels) { - ch.unsafe().close(ch.unsafe().voidFuture()); - } - } - - @Override - protected void wakeup(boolean inEventLoop) { - if (wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } + }, nThreads, threadFactory); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SingleThreadSelectorEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/SingleThreadSelectorEventLoop.java new file mode 100644 index 0000000000..245c3e1590 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/SingleThreadSelectorEventLoop.java @@ -0,0 +1,231 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import io.netty.channel.Channel; +import io.netty.channel.Channel.Unsafe; +import io.netty.channel.ChannelException; +import io.netty.channel.SingleThreadEventLoop; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; + +import java.io.IOException; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; + +final class SingleThreadSelectorEventLoop extends SingleThreadEventLoop { + + /** + * Internal Netty logger. + */ + protected static final InternalLogger logger = InternalLoggerFactory + .getInstance(SingleThreadSelectorEventLoop.class); + + static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. + + /** + * The NIO {@link Selector}. + */ + protected final Selector selector; + + /** + * Boolean that controls determines if a blocked Selector.select should + * break out of its selection process. In our case we use a timeone for + * the select method and the select method will block for that time unless + * waken up. + */ + protected final AtomicBoolean wakenUp = new AtomicBoolean(); + + int cancelledKeys; + + SingleThreadSelectorEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) { + super(threadFactory); + if (selectorProvider == null) { + throw new NullPointerException("selectorProvider"); + } + selector = openSelector(selectorProvider); + } + + private static Selector openSelector(SelectorProvider provider) { + try { + return provider.openSelector(); + } catch (IOException e) { + throw new ChannelException("failed to open a new selector", e); + } + } + + @Override + protected void run() { + Selector selector = this.selector; + for (;;) { + + wakenUp.set(false); + + try { + SelectorUtil.select(selector); + + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). + + if (wakenUp.get()) { + selector.wakeup(); + } + + cancelledKeys = 0; + processTaskQueue(); + processSelectedKeys(); + + if (isShutdown()) { + closeAll(); + break; + } + } catch (Throwable t) { + logger.warn( + "Unexpected exception in the selector loop.", t); + + // Prevent possible consecutive immediate failures that lead to + // excessive CPU consumption. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore. + } + } + } + } + + @Override + protected void cleanup() { + try { + selector.close(); + } catch (IOException e) { + logger.warn( + "Failed to close a selector.", e); + } + } + + private void processTaskQueue() { + for (;;) { + final Runnable task = pollTask(); + if (task == null) { + break; + } + + task.run(); + cleanUpCancelledKeys(); + } + } + + private void processSelectedKeys() { + for (Iterator i = selector.selectedKeys().iterator(); i.hasNext();) { + final SelectionKey k = i.next(); + final Channel ch = (Channel) k.attachment(); + final Unsafe unsafe = ch.unsafe(); + boolean removeKey = true; + try { + int readyOps = k.readyOps(); + if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { + unsafe.read(); + if (!ch.isOpen()) { + // Connection already closed - no need to handle write. + continue; + } + } + if ((readyOps & SelectionKey.OP_WRITE) != 0) { + unsafe.flushForcibly(); + } + if ((readyOps & SelectionKey.OP_ACCEPT) != 0) { + unsafe.read(); + } + if ((readyOps & SelectionKey.OP_CONNECT) != 0) { + unsafe.finishConnect(); + } + } catch (CancelledKeyException ignored) { + unsafe.close(unsafe.voidFuture()); + } finally { + if (removeKey) { + i.remove(); + } + } + + if (cleanUpCancelledKeys()) { + break; // break the loop to avoid ConcurrentModificationException + } + } + } + + private boolean cleanUpCancelledKeys() { + if (cancelledKeys >= CLEANUP_INTERVAL) { + cancelledKeys = 0; + SelectorUtil.cleanupKeys(selector); + return true; + } + return false; + } + + private void closeAll() { + SelectorUtil.cleanupKeys(selector); + Set keys = selector.keys(); + Collection channels = new ArrayList(keys.size()); + for (SelectionKey k: keys) { + channels.add((Channel) k.attachment()); + } + + for (Channel ch: channels) { + ch.unsafe().close(ch.unsafe().voidFuture()); + } + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (wakenUp.compareAndSet(false, true)) { + selector.wakeup(); + } + } +}