/* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.netty.channel.nio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.EventLoopException; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.nio.AbstractNioChannel.NioUnsafe; import io.netty.util.concurrent.TaskScheduler; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; import java.util.ArrayList; import java.util.Collection; import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a * {@link Selector} and so does the multi-plexing of these in the event loop. * */ public final class NioEventLoop extends SingleThreadEventLoop { /** * Internal Netty logger. */ private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class); private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. private static final boolean EPOLL_BUG_WORKAROUND = SystemPropertyUtil.getBoolean("io.netty.epollBugWorkaround", false); private static final long SELECT_TIMEOUT = SystemPropertyUtil.getLong("io.netty.selectTimeout", 500); private static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT); // Workaround for JDK NIO bug. // // See: // - http://bugs.sun.com/view_bug.do?bug_id=6427854 // - https://github.com/netty/netty/issues/203 static { String key = "sun.nio.ch.bugLevel"; try { String buglevel = System.getProperty(key); if (buglevel == null) { System.setProperty(key, ""); } } catch (SecurityException e) { if (logger.isDebugEnabled()) { logger.debug("Unable to get/set System Property '" + key + '\'', e); } } if (logger.isDebugEnabled()) { logger.debug("Using select timeout of " + SELECT_TIMEOUT); logger.debug("Epoll-bug workaround enabled = " + EPOLL_BUG_WORKAROUND); } } /** * The NIO {@link Selector}. */ Selector selector; private final SelectorProvider provider; /** * Boolean that controls determines if a blocked Selector.select should * break out of its selection process. In our case we use a timeone for * the select method and the select method will block for that time unless * waken up. */ private final AtomicBoolean wakenUp = new AtomicBoolean(); private volatile int ioRatio = 50; private int cancelledKeys; private boolean cleanedCancelledKeys; NioEventLoop( NioEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler, SelectorProvider selectorProvider) { super(parent, threadFactory, scheduler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); } private Selector openSelector() { try { return provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } } @Override protected Queue newTaskQueue() { // This event loop never calls takeTask() return new ConcurrentLinkedQueue(); } /** * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector} * of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will * be executed by this event loop when the {@link SelectableChannel} is ready. */ public void register(final SelectableChannel ch, final int interestOps, final NioTask task) { if (ch == null) { throw new NullPointerException("ch"); } if (interestOps == 0) { throw new IllegalArgumentException("interestOps must be non-zero."); } if ((interestOps & ~ch.validOps()) != 0) { throw new IllegalArgumentException( "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')'); } if (task == null) { throw new NullPointerException("task"); } if (isShutdown()) { throw new IllegalStateException("event loop shut down"); } try { ch.register(selector, interestOps, task); } catch (Exception e) { throw new EventLoopException("failed to register a channel", e); } } void executeWhenWritable(AbstractNioChannel channel, NioTask task) { if (channel == null) { throw new NullPointerException("channel"); } if (isShutdown()) { throw new IllegalStateException("event loop shut down"); } SelectionKey key = channel.selectionKey(); channel.writableTasks.offer(task); int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } } /** * Returns the percentage of the desired amount of time spent for I/O in the event loop. */ public int getIoRatio() { return ioRatio; } /** * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. */ public void setIoRatio(int ioRatio) { if (ioRatio <= 0 || ioRatio >= 100) { throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio < 100)"); } this.ioRatio = ioRatio; } /** * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work * around the infamous epoll 100% CPU bug. */ public void rebuildSelector() { if (!inEventLoop()) { execute(new Runnable() { @Override public void run() { rebuildSelector(); } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { newSelector = Selector.open(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (;;) { try { for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { if (key.channel().keyFor(newSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel(); key.channel().register(newSelector, interestOps, a); nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidFuture()); } else { @SuppressWarnings("unchecked") NioTask task = (NioTask) a; invokeChannelUnregistered(task, key, e); } } } } catch (ConcurrentModificationException e) { // Probably due to concurrent modification of the key set. continue; } break; } selector = newSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } @Override protected void run() { // use 80% of the timeout for measure final long minSelectTimeout = SELECT_TIMEOUT_NANOS / 100 * 80; Selector selector = this.selector; int selectReturnsImmediately = 0; for (;;) { wakenUp.set(false); try { if (hasTasks()) { selectNow(); } else { long beforeSelect = System.nanoTime(); int selected = select(); if (EPOLL_BUG_WORKAROUND) { if (selected == 0) { long timeBlocked = System.nanoTime() - beforeSelect; if (timeBlocked < minSelectTimeout) { // returned before the minSelectTimeout elapsed with nothing select. // this may be the cause of the jdk epoll(..) bug, so increment the counter // which we use later to see if its really the jdk bug. selectReturnsImmediately ++; } else { selectReturnsImmediately = 0; } if (selectReturnsImmediately == 10) { // The selector returned immediately for 10 times in a row, // so recreate one selector as it seems like we hit the // famous epoll(..) jdk bug. rebuildSelector(); selector = this.selector; selectReturnsImmediately = 0; // try to select again continue; } } else { // reset counter selectReturnsImmediately = 0; } } // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; final long ioStartTime = System.nanoTime(); processSelectedKeys(); selector = this.selector; final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); selector = this.selector; if (isShutdown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } } @Override protected void cleanup() { try { selector.close(); } catch (IOException e) { logger.warn("Failed to close a selector.", e); } } void cancel(SelectionKey key) { key.cancel(); cancelledKeys ++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; cleanedCancelledKeys = true; cleanupKeys(); } } private void processSelectedKeys() { Set selectedKeys = selector.selectedKeys(); // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } Iterator i; cleanedCancelledKeys = false; boolean clearSelectedKeys = true; try { for (i = selectedKeys.iterator(); i.hasNext();) { final SelectionKey k = i.next(); final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask task = (NioTask) a; processSelectedKey(k, task); } if (cleanedCancelledKeys) { // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { clearSelectedKeys = false; break; } else { i = selectedKeys.iterator(); } } } } finally { if (clearSelectedKeys) { selectedKeys.clear(); } } } private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidFuture()); return; } int readyOps = -1; try { readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { processWritable(k, ch); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException e) { if (readyOps != -1 && (readyOps & SelectionKey.OP_WRITE) != 0) { unregisterWritableTasks(ch); } unsafe.close(unsafe.voidFuture()); } } private static void processWritable(SelectionKey k, AbstractNioChannel ch) { if (ch.writableTasks.isEmpty()) { ch.unsafe().flushNow(); } else { NioTask task; for (;;) { task = ch.writableTasks.poll(); if (task == null) { break; } processSelectedKey(ch.selectionKey(), task); } k.interestOps(k.interestOps() | SelectionKey.OP_WRITE); } } private static void unregisterWritableTasks(AbstractNioChannel ch) { NioTask task; for (;;) { task = ch.writableTasks.poll(); if (task == null) { break; } else { invokeChannelUnregistered(task, ch.selectionKey(), null); } } } private static void processSelectedKey(SelectionKey k, NioTask task) { int state = 0; try { task.channelReady(k.channel(), k); state = 1; } catch (Exception e) { k.cancel(); invokeChannelUnregistered(task, k, e); state = 2; } finally { switch (state) { case 0: k.cancel(); invokeChannelUnregistered(task, k, null); break; case 1: if (!k.isValid()) { // Cancelled by channelReady() invokeChannelUnregistered(task, k, null); } break; } } } private void closeAll() { cleanupKeys(); Set keys = selector.keys(); Collection channels = new ArrayList(keys.size()); for (SelectionKey k: keys) { Object a = k.attachment(); if (a instanceof AbstractNioChannel) { channels.add((AbstractNioChannel) a); } else { k.cancel(); @SuppressWarnings("unchecked") NioTask task = (NioTask) a; invokeChannelUnregistered(task, k, null); } } for (AbstractNioChannel ch: channels) { unregisterWritableTasks(ch); ch.unsafe().close(ch.unsafe().voidFuture()); } } private static void invokeChannelUnregistered(NioTask task, SelectionKey k, Throwable cause) { try { task.channelUnregistered(k.channel(), cause); } catch (Exception e) { logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e); } } @Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.compareAndSet(false, true)) { selector.wakeup(); } } void selectNow() throws IOException { try { selector.selectNow(); } finally { // restore wakup state if needed if (wakenUp.get()) { selector.wakeup(); } } } private int select() throws IOException { try { return selector.select(); } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug( CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e); } // Harmless exception - log anyway } return -1; } void cleanupKeys() { try { selector.selectNow(); } catch (Throwable t) { logger.warn("Failed to update SelectionKeys.", t); } } }