86f53083e7
Motivation: SingleThreadEventExecutor.pendingTasks() will call taskQueue.size() to get the number of pending tasks in the queue. This is not safe when using MpscLinkedQueue as size() is only allowed to be called by a single consumer. Modifications: Ensure size() is only called from the EventLoop. Result: No more livelock possible when call pendingTasks, no matter from which thread it is done.
755 lines
28 KiB
Java
755 lines
28 KiB
Java
/*
|
|
* Copyright 2012 The Netty Project
|
|
*
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at:
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
* License for the specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
package io.netty.channel.nio;
|
|
|
|
import io.netty.channel.Channel;
|
|
import io.netty.channel.ChannelException;
|
|
import io.netty.channel.EventLoop;
|
|
import io.netty.channel.EventLoopException;
|
|
import io.netty.channel.SelectStrategy;
|
|
import io.netty.channel.SingleThreadEventLoop;
|
|
import io.netty.util.IntSupplier;
|
|
import io.netty.util.internal.PlatformDependent;
|
|
import io.netty.util.internal.SystemPropertyUtil;
|
|
import io.netty.util.internal.logging.InternalLogger;
|
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
|
|
|
import java.io.IOException;
|
|
import java.lang.reflect.Field;
|
|
import java.nio.channels.CancelledKeyException;
|
|
import java.nio.channels.SelectableChannel;
|
|
import java.nio.channels.SelectionKey;
|
|
import java.nio.channels.Selector;
|
|
import java.nio.channels.spi.SelectorProvider;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.ConcurrentModificationException;
|
|
import java.util.Iterator;
|
|
import java.util.Queue;
|
|
import java.util.Set;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
/**
|
|
* {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
|
|
* {@link Selector} and so does the multi-plexing of these in the event loop.
|
|
*
|
|
*/
|
|
public final class NioEventLoop extends SingleThreadEventLoop {
|
|
|
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
|
|
|
|
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
|
|
|
|
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
|
|
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
|
|
|
|
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
|
|
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
|
|
|
|
private final IntSupplier selectNowSupplier = new IntSupplier() {
|
|
@Override
|
|
public int get() throws Exception {
|
|
return selectNow();
|
|
}
|
|
};
|
|
private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
|
|
@Override
|
|
public Integer call() throws Exception {
|
|
return NioEventLoop.super.pendingTasks();
|
|
}
|
|
};
|
|
|
|
// Workaround for JDK NIO bug.
|
|
//
|
|
// See:
|
|
// - http://bugs.sun.com/view_bug.do?bug_id=6427854
|
|
// - https://github.com/netty/netty/issues/203
|
|
static {
|
|
String key = "sun.nio.ch.bugLevel";
|
|
try {
|
|
String buglevel = SystemPropertyUtil.get(key);
|
|
if (buglevel == null) {
|
|
System.setProperty(key, "");
|
|
}
|
|
} catch (SecurityException e) {
|
|
if (logger.isDebugEnabled()) {
|
|
logger.debug("Unable to get/set System Property: {}", key, e);
|
|
}
|
|
}
|
|
|
|
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
|
|
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
|
|
selectorAutoRebuildThreshold = 0;
|
|
}
|
|
|
|
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
|
|
|
|
if (logger.isDebugEnabled()) {
|
|
logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
|
|
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* The NIO {@link Selector}.
|
|
*/
|
|
Selector selector;
|
|
private SelectedSelectionKeySet selectedKeys;
|
|
|
|
private final SelectorProvider provider;
|
|
|
|
/**
|
|
* Boolean that controls determines if a blocked Selector.select should
|
|
* break out of its selection process. In our case we use a timeout for
|
|
* the select method and the select method will block for that time unless
|
|
* waken up.
|
|
*/
|
|
private final AtomicBoolean wakenUp = new AtomicBoolean();
|
|
|
|
private final SelectStrategy selectStrategy;
|
|
|
|
private volatile int ioRatio = 50;
|
|
private int cancelledKeys;
|
|
private boolean needsToSelectAgain;
|
|
|
|
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
|
|
SelectStrategy strategy) {
|
|
super(parent, executor, false);
|
|
if (selectorProvider == null) {
|
|
throw new NullPointerException("selectorProvider");
|
|
}
|
|
if (strategy == null) {
|
|
throw new NullPointerException("selectStrategy");
|
|
}
|
|
provider = selectorProvider;
|
|
selector = openSelector();
|
|
selectStrategy = strategy;
|
|
}
|
|
|
|
private Selector openSelector() {
|
|
final Selector selector;
|
|
try {
|
|
selector = provider.openSelector();
|
|
} catch (IOException e) {
|
|
throw new ChannelException("failed to open a new selector", e);
|
|
}
|
|
|
|
if (DISABLE_KEYSET_OPTIMIZATION) {
|
|
return selector;
|
|
}
|
|
|
|
try {
|
|
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
|
|
|
|
Class<?> selectorImplClass =
|
|
Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
|
|
|
|
// Ensure the current selector implementation is what we can instrument.
|
|
if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
|
|
return selector;
|
|
}
|
|
|
|
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
|
|
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
|
|
|
|
selectedKeysField.setAccessible(true);
|
|
publicSelectedKeysField.setAccessible(true);
|
|
|
|
selectedKeysField.set(selector, selectedKeySet);
|
|
publicSelectedKeysField.set(selector, selectedKeySet);
|
|
|
|
selectedKeys = selectedKeySet;
|
|
logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
|
|
} catch (Throwable t) {
|
|
selectedKeys = null;
|
|
logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
|
|
}
|
|
|
|
return selector;
|
|
}
|
|
|
|
@Override
|
|
protected Queue<Runnable> newTaskQueue() {
|
|
// This event loop never calls takeTask()
|
|
return PlatformDependent.newMpscQueue();
|
|
}
|
|
|
|
@Override
|
|
public int pendingTasks() {
|
|
// As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
|
|
// otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
|
|
// See https://github.com/netty/netty/issues/5297
|
|
if (inEventLoop()) {
|
|
return super.pendingTasks();
|
|
} else {
|
|
return submit(pendingTasksCallable).syncUninterruptibly().getNow();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
|
|
* of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
|
|
* be executed by this event loop when the {@link SelectableChannel} is ready.
|
|
*/
|
|
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
|
|
if (ch == null) {
|
|
throw new NullPointerException("ch");
|
|
}
|
|
if (interestOps == 0) {
|
|
throw new IllegalArgumentException("interestOps must be non-zero.");
|
|
}
|
|
if ((interestOps & ~ch.validOps()) != 0) {
|
|
throw new IllegalArgumentException(
|
|
"invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
|
|
}
|
|
if (task == null) {
|
|
throw new NullPointerException("task");
|
|
}
|
|
|
|
if (isShutdown()) {
|
|
throw new IllegalStateException("event loop shut down");
|
|
}
|
|
|
|
try {
|
|
ch.register(selector, interestOps, task);
|
|
} catch (Exception e) {
|
|
throw new EventLoopException("failed to register a channel", e);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
|
|
*/
|
|
public int getIoRatio() {
|
|
return ioRatio;
|
|
}
|
|
|
|
/**
|
|
* Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
|
|
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
|
|
*/
|
|
public void setIoRatio(int ioRatio) {
|
|
if (ioRatio <= 0 || ioRatio > 100) {
|
|
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
|
|
}
|
|
this.ioRatio = ioRatio;
|
|
}
|
|
|
|
/**
|
|
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
|
|
* around the infamous epoll 100% CPU bug.
|
|
*/
|
|
public void rebuildSelector() {
|
|
if (!inEventLoop()) {
|
|
execute(new Runnable() {
|
|
@Override
|
|
public void run() {
|
|
rebuildSelector();
|
|
}
|
|
});
|
|
return;
|
|
}
|
|
|
|
final Selector oldSelector = selector;
|
|
final Selector newSelector;
|
|
|
|
if (oldSelector == null) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
newSelector = openSelector();
|
|
} catch (Exception e) {
|
|
logger.warn("Failed to create a new Selector.", e);
|
|
return;
|
|
}
|
|
|
|
// Register all channels to the new Selector.
|
|
int nChannels = 0;
|
|
for (;;) {
|
|
try {
|
|
for (SelectionKey key: oldSelector.keys()) {
|
|
Object a = key.attachment();
|
|
try {
|
|
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
|
|
continue;
|
|
}
|
|
|
|
int interestOps = key.interestOps();
|
|
key.cancel();
|
|
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
|
|
if (a instanceof AbstractNioChannel) {
|
|
// Update SelectionKey
|
|
((AbstractNioChannel) a).selectionKey = newKey;
|
|
}
|
|
nChannels ++;
|
|
} catch (Exception e) {
|
|
logger.warn("Failed to re-register a Channel to the new Selector.", e);
|
|
if (a instanceof AbstractNioChannel) {
|
|
AbstractNioChannel ch = (AbstractNioChannel) a;
|
|
ch.unsafe().close(ch.unsafe().voidPromise());
|
|
} else {
|
|
@SuppressWarnings("unchecked")
|
|
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
|
|
invokeChannelUnregistered(task, key, e);
|
|
}
|
|
}
|
|
}
|
|
} catch (ConcurrentModificationException e) {
|
|
// Probably due to concurrent modification of the key set.
|
|
continue;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
selector = newSelector;
|
|
|
|
try {
|
|
// time to close the old selector as everything else is registered to the new one
|
|
oldSelector.close();
|
|
} catch (Throwable t) {
|
|
if (logger.isWarnEnabled()) {
|
|
logger.warn("Failed to close the old Selector.", t);
|
|
}
|
|
}
|
|
|
|
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
|
|
}
|
|
|
|
@Override
|
|
protected void run() {
|
|
for (;;) {
|
|
try {
|
|
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
|
|
case SelectStrategy.CONTINUE:
|
|
continue;
|
|
case SelectStrategy.SELECT:
|
|
select(wakenUp.getAndSet(false));
|
|
|
|
// 'wakenUp.compareAndSet(false, true)' is always evaluated
|
|
// before calling 'selector.wakeup()' to reduce the wake-up
|
|
// overhead. (Selector.wakeup() is an expensive operation.)
|
|
//
|
|
// However, there is a race condition in this approach.
|
|
// The race condition is triggered when 'wakenUp' is set to
|
|
// true too early.
|
|
//
|
|
// 'wakenUp' is set to true too early if:
|
|
// 1) Selector is waken up between 'wakenUp.set(false)' and
|
|
// 'selector.select(...)'. (BAD)
|
|
// 2) Selector is waken up between 'selector.select(...)' and
|
|
// 'if (wakenUp.get()) { ... }'. (OK)
|
|
//
|
|
// In the first case, 'wakenUp' is set to true and the
|
|
// following 'selector.select(...)' will wake up immediately.
|
|
// Until 'wakenUp' is set to false again in the next round,
|
|
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
|
|
// any attempt to wake up the Selector will fail, too, causing
|
|
// the following 'selector.select(...)' call to block
|
|
// unnecessarily.
|
|
//
|
|
// To fix this problem, we wake up the selector again if wakenUp
|
|
// is true immediately after selector.select(...).
|
|
// It is inefficient in that it wakes up the selector for both
|
|
// the first case (BAD - wake-up required) and the second case
|
|
// (OK - no wake-up required).
|
|
|
|
if (wakenUp.get()) {
|
|
selector.wakeup();
|
|
}
|
|
default:
|
|
// fallthrough
|
|
}
|
|
|
|
cancelledKeys = 0;
|
|
needsToSelectAgain = false;
|
|
final int ioRatio = this.ioRatio;
|
|
if (ioRatio == 100) {
|
|
processSelectedKeys();
|
|
runAllTasks();
|
|
} else {
|
|
final long ioStartTime = System.nanoTime();
|
|
|
|
processSelectedKeys();
|
|
|
|
final long ioTime = System.nanoTime() - ioStartTime;
|
|
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
|
|
}
|
|
|
|
if (isShuttingDown()) {
|
|
closeAll();
|
|
if (confirmShutdown()) {
|
|
break;
|
|
}
|
|
}
|
|
} catch (Throwable t) {
|
|
logger.warn("Unexpected exception in the selector loop.", t);
|
|
|
|
// Prevent possible consecutive immediate failures that lead to
|
|
// excessive CPU consumption.
|
|
try {
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException e) {
|
|
// Ignore.
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private void processSelectedKeys() {
|
|
if (selectedKeys != null) {
|
|
processSelectedKeysOptimized(selectedKeys.flip());
|
|
} else {
|
|
processSelectedKeysPlain(selector.selectedKeys());
|
|
}
|
|
}
|
|
|
|
@Override
|
|
protected void cleanup() {
|
|
try {
|
|
selector.close();
|
|
} catch (IOException e) {
|
|
logger.warn("Failed to close a selector.", e);
|
|
}
|
|
}
|
|
|
|
void cancel(SelectionKey key) {
|
|
key.cancel();
|
|
cancelledKeys ++;
|
|
if (cancelledKeys >= CLEANUP_INTERVAL) {
|
|
cancelledKeys = 0;
|
|
needsToSelectAgain = true;
|
|
}
|
|
}
|
|
|
|
@Override
|
|
protected Runnable pollTask() {
|
|
Runnable task = super.pollTask();
|
|
if (needsToSelectAgain) {
|
|
selectAgain();
|
|
}
|
|
return task;
|
|
}
|
|
|
|
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
|
|
// check if the set is empty and if so just return to not create garbage by
|
|
// creating a new Iterator every time even if there is nothing to process.
|
|
// See https://github.com/netty/netty/issues/597
|
|
if (selectedKeys.isEmpty()) {
|
|
return;
|
|
}
|
|
|
|
Iterator<SelectionKey> i = selectedKeys.iterator();
|
|
for (;;) {
|
|
final SelectionKey k = i.next();
|
|
final Object a = k.attachment();
|
|
i.remove();
|
|
|
|
if (a instanceof AbstractNioChannel) {
|
|
processSelectedKey(k, (AbstractNioChannel) a);
|
|
} else {
|
|
@SuppressWarnings("unchecked")
|
|
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
|
|
processSelectedKey(k, task);
|
|
}
|
|
|
|
if (!i.hasNext()) {
|
|
break;
|
|
}
|
|
|
|
if (needsToSelectAgain) {
|
|
selectAgain();
|
|
selectedKeys = selector.selectedKeys();
|
|
|
|
// Create the iterator again to avoid ConcurrentModificationException
|
|
if (selectedKeys.isEmpty()) {
|
|
break;
|
|
} else {
|
|
i = selectedKeys.iterator();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
|
|
for (int i = 0;; i ++) {
|
|
final SelectionKey k = selectedKeys[i];
|
|
if (k == null) {
|
|
break;
|
|
}
|
|
// null out entry in the array to allow to have it GC'ed once the Channel close
|
|
// See https://github.com/netty/netty/issues/2363
|
|
selectedKeys[i] = null;
|
|
|
|
final Object a = k.attachment();
|
|
|
|
if (a instanceof AbstractNioChannel) {
|
|
processSelectedKey(k, (AbstractNioChannel) a);
|
|
} else {
|
|
@SuppressWarnings("unchecked")
|
|
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
|
|
processSelectedKey(k, task);
|
|
}
|
|
|
|
if (needsToSelectAgain) {
|
|
// null out entries in the array to allow to have it GC'ed once the Channel close
|
|
// See https://github.com/netty/netty/issues/2363
|
|
for (;;) {
|
|
i++;
|
|
if (selectedKeys[i] == null) {
|
|
break;
|
|
}
|
|
selectedKeys[i] = null;
|
|
}
|
|
|
|
selectAgain();
|
|
// Need to flip the optimized selectedKeys to get the right reference to the array
|
|
// and reset the index to -1 which will then set to 0 on the for loop
|
|
// to start over again.
|
|
//
|
|
// See https://github.com/netty/netty/issues/1523
|
|
selectedKeys = this.selectedKeys.flip();
|
|
i = -1;
|
|
}
|
|
}
|
|
}
|
|
|
|
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
|
|
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
|
|
if (!k.isValid()) {
|
|
final EventLoop eventLoop;
|
|
try {
|
|
eventLoop = ch.eventLoop();
|
|
} catch (Throwable ignored) {
|
|
// If the channel implementation throws an exception because there is no event loop, we ignore this
|
|
// because we are only trying to determine if ch is registered to this event loop and thus has authority
|
|
// to close ch.
|
|
return;
|
|
}
|
|
// Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
|
|
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
|
|
// still healthy and should not be closed.
|
|
// See https://github.com/netty/netty/issues/5125
|
|
if (eventLoop != this || eventLoop == null) {
|
|
return;
|
|
}
|
|
// close the channel if the key is not valid anymore
|
|
unsafe.close(unsafe.voidPromise());
|
|
return;
|
|
}
|
|
|
|
try {
|
|
int readyOps = k.readyOps();
|
|
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
|
|
// to a spin loop
|
|
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
|
|
unsafe.read();
|
|
if (!ch.isOpen()) {
|
|
// Connection already closed - no need to handle write.
|
|
return;
|
|
}
|
|
}
|
|
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
|
|
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
|
|
ch.unsafe().forceFlush();
|
|
}
|
|
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
|
|
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
|
|
// See https://github.com/netty/netty/issues/924
|
|
int ops = k.interestOps();
|
|
ops &= ~SelectionKey.OP_CONNECT;
|
|
k.interestOps(ops);
|
|
|
|
unsafe.finishConnect();
|
|
}
|
|
} catch (CancelledKeyException ignored) {
|
|
unsafe.close(unsafe.voidPromise());
|
|
}
|
|
}
|
|
|
|
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
|
|
int state = 0;
|
|
try {
|
|
task.channelReady(k.channel(), k);
|
|
state = 1;
|
|
} catch (Exception e) {
|
|
k.cancel();
|
|
invokeChannelUnregistered(task, k, e);
|
|
state = 2;
|
|
} finally {
|
|
switch (state) {
|
|
case 0:
|
|
k.cancel();
|
|
invokeChannelUnregistered(task, k, null);
|
|
break;
|
|
case 1:
|
|
if (!k.isValid()) { // Cancelled by channelReady()
|
|
invokeChannelUnregistered(task, k, null);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
private void closeAll() {
|
|
selectAgain();
|
|
Set<SelectionKey> keys = selector.keys();
|
|
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
|
|
for (SelectionKey k: keys) {
|
|
Object a = k.attachment();
|
|
if (a instanceof AbstractNioChannel) {
|
|
channels.add((AbstractNioChannel) a);
|
|
} else {
|
|
k.cancel();
|
|
@SuppressWarnings("unchecked")
|
|
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
|
|
invokeChannelUnregistered(task, k, null);
|
|
}
|
|
}
|
|
|
|
for (AbstractNioChannel ch: channels) {
|
|
ch.unsafe().close(ch.unsafe().voidPromise());
|
|
}
|
|
}
|
|
|
|
private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
|
|
try {
|
|
task.channelUnregistered(k.channel(), cause);
|
|
} catch (Exception e) {
|
|
logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
protected void wakeup(boolean inEventLoop) {
|
|
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
|
|
selector.wakeup();
|
|
}
|
|
}
|
|
|
|
int selectNow() throws IOException {
|
|
try {
|
|
return selector.selectNow();
|
|
} finally {
|
|
// restore wakup state if needed
|
|
if (wakenUp.get()) {
|
|
selector.wakeup();
|
|
}
|
|
}
|
|
}
|
|
|
|
private void select(boolean oldWakenUp) throws IOException {
|
|
Selector selector = this.selector;
|
|
try {
|
|
int selectCnt = 0;
|
|
long currentTimeNanos = System.nanoTime();
|
|
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
|
|
for (;;) {
|
|
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
|
|
if (timeoutMillis <= 0) {
|
|
if (selectCnt == 0) {
|
|
selector.selectNow();
|
|
selectCnt = 1;
|
|
}
|
|
break;
|
|
}
|
|
|
|
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
|
|
// Selector#wakeup. So we need to check task queue again before executing select operation.
|
|
// If we don't, the task might be pended until select operation was timed out.
|
|
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
|
|
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
|
|
selector.selectNow();
|
|
selectCnt = 1;
|
|
break;
|
|
}
|
|
|
|
int selectedKeys = selector.select(timeoutMillis);
|
|
selectCnt ++;
|
|
|
|
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
|
|
// - Selected something,
|
|
// - waken up by user, or
|
|
// - the task queue has a pending task.
|
|
// - a scheduled task is ready for processing
|
|
break;
|
|
}
|
|
if (Thread.interrupted()) {
|
|
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
|
|
// As this is most likely a bug in the handler of the user or it's client library we will
|
|
// also log it.
|
|
//
|
|
// See https://github.com/netty/netty/issues/2426
|
|
if (logger.isDebugEnabled()) {
|
|
logger.debug("Selector.select() returned prematurely because " +
|
|
"Thread.currentThread().interrupt() was called. Use " +
|
|
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
|
|
}
|
|
selectCnt = 1;
|
|
break;
|
|
}
|
|
|
|
long time = System.nanoTime();
|
|
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
|
|
// timeoutMillis elapsed without anything selected.
|
|
selectCnt = 1;
|
|
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
|
|
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
|
|
// The selector returned prematurely many times in a row.
|
|
// Rebuild the selector to work around the problem.
|
|
logger.warn(
|
|
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
|
|
selectCnt);
|
|
|
|
rebuildSelector();
|
|
selector = this.selector;
|
|
|
|
// Select again to populate selectedKeys.
|
|
selector.selectNow();
|
|
selectCnt = 1;
|
|
break;
|
|
}
|
|
|
|
currentTimeNanos = time;
|
|
}
|
|
|
|
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
|
|
if (logger.isDebugEnabled()) {
|
|
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
|
|
}
|
|
}
|
|
} catch (CancelledKeyException e) {
|
|
if (logger.isDebugEnabled()) {
|
|
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
|
|
}
|
|
// Harmless exception - log anyway
|
|
}
|
|
}
|
|
|
|
private void selectAgain() {
|
|
needsToSelectAgain = false;
|
|
try {
|
|
selector.selectNow();
|
|
} catch (Throwable t) {
|
|
logger.warn("Failed to update SelectionKeys.", t);
|
|
}
|
|
}
|
|
}
|