2012-06-04 22:31:44 +02:00
|
|
|
/*
|
|
|
|
* Copyright 2012 The Netty Project
|
|
|
|
*
|
|
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
* with the License. You may obtain a copy of the License at:
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
* License for the specific language governing permissions and limitations
|
|
|
|
* under the License.
|
|
|
|
*/
|
2012-05-26 00:51:22 +02:00
|
|
|
package io.netty.channel.socket.nio;
|
|
|
|
|
2012-08-10 13:17:18 +02:00
|
|
|
import io.netty.channel.Channel;
|
|
|
|
import io.netty.channel.ChannelException;
|
2012-08-19 08:10:09 +02:00
|
|
|
import io.netty.channel.ChannelTaskScheduler;
|
2012-10-25 02:08:11 +02:00
|
|
|
import io.netty.channel.EventLoopException;
|
2012-09-01 09:56:09 +02:00
|
|
|
import io.netty.channel.SingleThreadEventLoop;
|
2012-08-10 13:17:18 +02:00
|
|
|
import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe;
|
|
|
|
import io.netty.logging.InternalLogger;
|
|
|
|
import io.netty.logging.InternalLoggerFactory;
|
2012-05-26 00:51:22 +02:00
|
|
|
|
2012-08-10 13:17:18 +02:00
|
|
|
import java.io.IOException;
|
2012-08-29 08:12:19 +02:00
|
|
|
import java.nio.channels.CancelledKeyException;
|
2012-09-01 09:56:09 +02:00
|
|
|
import java.nio.channels.SelectableChannel;
|
2012-08-29 08:12:19 +02:00
|
|
|
import java.nio.channels.SelectionKey;
|
|
|
|
import java.nio.channels.Selector;
|
2012-05-26 00:51:22 +02:00
|
|
|
import java.nio.channels.spi.SelectorProvider;
|
2012-08-10 13:17:18 +02:00
|
|
|
import java.util.ArrayList;
|
|
|
|
import java.util.Collection;
|
|
|
|
import java.util.Iterator;
|
2012-08-18 11:48:44 +02:00
|
|
|
import java.util.Queue;
|
2012-08-10 13:17:18 +02:00
|
|
|
import java.util.Set;
|
2012-08-18 11:48:44 +02:00
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
2012-05-26 00:51:22 +02:00
|
|
|
import java.util.concurrent.ThreadFactory;
|
2012-08-10 13:17:18 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
2012-05-26 00:51:22 +02:00
|
|
|
|
2012-08-28 02:10:17 +02:00
|
|
|
/**
|
|
|
|
* {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
|
|
|
|
* {@link Selector} and so does the multi-plexing of these in the event loop.
|
|
|
|
*
|
|
|
|
*/
|
2012-10-25 02:08:11 +02:00
|
|
|
public final class NioEventLoop extends SingleThreadEventLoop {
|
2012-05-26 00:51:22 +02:00
|
|
|
|
2012-08-10 13:17:18 +02:00
|
|
|
/**
|
|
|
|
* Internal Netty logger.
|
|
|
|
*/
|
2012-09-01 09:56:09 +02:00
|
|
|
private static final InternalLogger logger =
|
|
|
|
InternalLoggerFactory.getInstance(NioEventLoop.class);
|
2012-08-10 13:17:18 +02:00
|
|
|
|
|
|
|
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
|
|
|
|
|
|
|
|
/**
|
|
|
|
* The NIO {@link Selector}.
|
|
|
|
*/
|
2012-09-01 09:56:09 +02:00
|
|
|
Selector selector;
|
2012-08-10 13:17:18 +02:00
|
|
|
|
2012-09-01 09:56:09 +02:00
|
|
|
private final SelectorProvider provider;
|
2012-08-29 08:12:19 +02:00
|
|
|
|
2012-08-10 13:17:18 +02:00
|
|
|
/**
|
|
|
|
* Boolean that controls determines if a blocked Selector.select should
|
|
|
|
* break out of its selection process. In our case we use a timeone for
|
|
|
|
* the select method and the select method will block for that time unless
|
|
|
|
* waken up.
|
|
|
|
*/
|
2012-09-01 09:56:09 +02:00
|
|
|
private final AtomicBoolean wakenUp = new AtomicBoolean();
|
2012-08-10 13:17:18 +02:00
|
|
|
|
|
|
|
private int cancelledKeys;
|
|
|
|
private boolean cleanedCancelledKeys;
|
|
|
|
|
2012-08-18 11:40:21 +02:00
|
|
|
NioEventLoop(
|
|
|
|
NioEventLoopGroup parent, ThreadFactory threadFactory,
|
2012-08-19 08:10:09 +02:00
|
|
|
ChannelTaskScheduler scheduler, SelectorProvider selectorProvider) {
|
2012-08-18 11:40:21 +02:00
|
|
|
super(parent, threadFactory, scheduler);
|
2012-08-10 13:17:18 +02:00
|
|
|
if (selectorProvider == null) {
|
|
|
|
throw new NullPointerException("selectorProvider");
|
|
|
|
}
|
2012-08-29 08:03:32 +02:00
|
|
|
provider = selectorProvider;
|
|
|
|
selector = openSelector();
|
2012-06-06 16:02:47 +02:00
|
|
|
}
|
2012-05-26 00:51:22 +02:00
|
|
|
|
2012-08-29 08:03:32 +02:00
|
|
|
private Selector openSelector() {
|
2012-08-10 13:17:18 +02:00
|
|
|
try {
|
|
|
|
return provider.openSelector();
|
|
|
|
} catch (IOException e) {
|
|
|
|
throw new ChannelException("failed to open a new selector", e);
|
|
|
|
}
|
2012-05-26 00:51:22 +02:00
|
|
|
}
|
|
|
|
|
2012-08-18 11:48:44 +02:00
|
|
|
@Override
|
|
|
|
protected Queue<Runnable> newTaskQueue() {
|
|
|
|
// This event loop never calls takeTask()
|
|
|
|
return new ConcurrentLinkedQueue<Runnable>();
|
|
|
|
}
|
|
|
|
|
2012-10-25 02:08:11 +02:00
|
|
|
/**
|
|
|
|
* Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
|
|
|
|
* of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
|
|
|
|
* be executed by this event loop when the {@link SelectableChannel} is ready.
|
|
|
|
*/
|
|
|
|
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
|
|
|
|
if (ch == null) {
|
|
|
|
throw new NullPointerException("ch");
|
|
|
|
}
|
|
|
|
if (interestOps == 0) {
|
|
|
|
throw new IllegalArgumentException("interestOps must be non-zero.");
|
|
|
|
}
|
|
|
|
if ((interestOps & ~ch.validOps()) != 0) {
|
|
|
|
throw new IllegalArgumentException(
|
|
|
|
"invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ")");
|
|
|
|
}
|
|
|
|
if (task == null) {
|
|
|
|
throw new NullPointerException("task");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (isShutdown()) {
|
|
|
|
throw new IllegalStateException("event loop shut down");
|
|
|
|
}
|
|
|
|
|
|
|
|
try {
|
|
|
|
ch.register(selector, interestOps, task);
|
|
|
|
} catch (Exception e) {
|
|
|
|
throw new EventLoopException("failed to register a channel", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-08-29 08:03:32 +02:00
|
|
|
// Create a new selector and "transfer" all channels from the old
|
|
|
|
// selector to the new one
|
|
|
|
private Selector recreateSelector() {
|
2012-10-25 02:08:11 +02:00
|
|
|
final Selector newSelector = openSelector();
|
2012-11-09 23:03:07 +01:00
|
|
|
final Selector oldSelector = selector;
|
2012-08-29 08:03:32 +02:00
|
|
|
|
2012-10-25 02:08:11 +02:00
|
|
|
// Register all channels to the new Selector.
|
|
|
|
boolean success = false;
|
|
|
|
try {
|
|
|
|
for (SelectionKey key: oldSelector.keys()) {
|
|
|
|
key.channel().register(newSelector, key.interestOps(), key.attachment());
|
2012-08-29 08:03:32 +02:00
|
|
|
}
|
2012-10-25 02:08:11 +02:00
|
|
|
success = true;
|
|
|
|
} catch (Exception e) {
|
|
|
|
logger.warn("Failed to re-register a Channel to the new Selector.", e);
|
|
|
|
} finally {
|
|
|
|
if (!success) {
|
|
|
|
try {
|
|
|
|
newSelector.close();
|
|
|
|
} catch (Exception e) {
|
|
|
|
logger.warn("Failed to close the new Selector.", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!success) {
|
|
|
|
// Keep using the old Selector on failure.
|
|
|
|
return oldSelector;
|
2012-08-29 08:03:32 +02:00
|
|
|
}
|
2012-10-25 02:08:11 +02:00
|
|
|
|
|
|
|
// Registration to the new Selector is done. Close the old Selector to cancel all old keys.
|
2012-08-29 08:03:32 +02:00
|
|
|
try {
|
|
|
|
selector.close();
|
2012-10-25 02:08:11 +02:00
|
|
|
} catch (Exception e) {
|
|
|
|
logger.warn("Failed to close the old selector.", e);
|
2012-08-29 08:03:32 +02:00
|
|
|
}
|
2012-10-25 02:08:11 +02:00
|
|
|
|
|
|
|
logger.info("Selector migration complete.");
|
2012-11-09 23:03:07 +01:00
|
|
|
return selector = newSelector;
|
2012-08-29 08:03:32 +02:00
|
|
|
}
|
2012-08-29 08:12:19 +02:00
|
|
|
|
2012-08-10 13:17:18 +02:00
|
|
|
@Override
|
|
|
|
protected void run() {
|
|
|
|
Selector selector = this.selector;
|
2012-08-29 08:03:32 +02:00
|
|
|
int selectReturnsImmediately = 0;
|
|
|
|
|
|
|
|
// use 80% of the timeout for measure
|
|
|
|
long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS / 100 * 80;
|
|
|
|
|
2012-08-10 13:17:18 +02:00
|
|
|
for (;;) {
|
|
|
|
|
|
|
|
wakenUp.set(false);
|
|
|
|
|
|
|
|
try {
|
2012-08-29 08:03:32 +02:00
|
|
|
long beforeSelect = System.nanoTime();
|
|
|
|
int selected = SelectorUtil.select(selector);
|
2012-09-01 09:56:09 +02:00
|
|
|
if (SelectorUtil.EPOLL_BUG_WORKAROUND) {
|
|
|
|
if (selected == 0) {
|
|
|
|
long timeBlocked = System.nanoTime() - beforeSelect;
|
|
|
|
if (timeBlocked < minSelectTimeout) {
|
|
|
|
// returned before the minSelectTimeout elapsed with nothing select.
|
|
|
|
// this may be the cause of the jdk epoll(..) bug, so increment the counter
|
|
|
|
// which we use later to see if its really the jdk bug.
|
|
|
|
selectReturnsImmediately ++;
|
|
|
|
} else {
|
|
|
|
selectReturnsImmediately = 0;
|
|
|
|
}
|
|
|
|
if (selectReturnsImmediately == 10) {
|
|
|
|
// The selector returned immediately for 10 times in a row,
|
|
|
|
// so recreate one selector as it seems like we hit the
|
|
|
|
// famous epoll(..) jdk bug.
|
|
|
|
selector = recreateSelector();
|
|
|
|
selectReturnsImmediately = 0;
|
|
|
|
|
|
|
|
// try to select again
|
|
|
|
continue;
|
|
|
|
}
|
2012-08-29 08:03:32 +02:00
|
|
|
} else {
|
2012-09-01 09:56:09 +02:00
|
|
|
// reset counter
|
2012-08-29 08:03:32 +02:00
|
|
|
selectReturnsImmediately = 0;
|
|
|
|
}
|
|
|
|
}
|
2012-08-10 13:17:18 +02:00
|
|
|
|
|
|
|
// 'wakenUp.compareAndSet(false, true)' is always evaluated
|
|
|
|
// before calling 'selector.wakeup()' to reduce the wake-up
|
|
|
|
// overhead. (Selector.wakeup() is an expensive operation.)
|
|
|
|
//
|
|
|
|
// However, there is a race condition in this approach.
|
|
|
|
// The race condition is triggered when 'wakenUp' is set to
|
|
|
|
// true too early.
|
|
|
|
//
|
|
|
|
// 'wakenUp' is set to true too early if:
|
|
|
|
// 1) Selector is waken up between 'wakenUp.set(false)' and
|
|
|
|
// 'selector.select(...)'. (BAD)
|
|
|
|
// 2) Selector is waken up between 'selector.select(...)' and
|
|
|
|
// 'if (wakenUp.get()) { ... }'. (OK)
|
|
|
|
//
|
|
|
|
// In the first case, 'wakenUp' is set to true and the
|
|
|
|
// following 'selector.select(...)' will wake up immediately.
|
|
|
|
// Until 'wakenUp' is set to false again in the next round,
|
|
|
|
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
|
|
|
|
// any attempt to wake up the Selector will fail, too, causing
|
|
|
|
// the following 'selector.select(...)' call to block
|
|
|
|
// unnecessarily.
|
|
|
|
//
|
|
|
|
// To fix this problem, we wake up the selector again if wakenUp
|
|
|
|
// is true immediately after selector.select(...).
|
|
|
|
// It is inefficient in that it wakes up the selector for both
|
|
|
|
// the first case (BAD - wake-up required) and the second case
|
|
|
|
// (OK - no wake-up required).
|
|
|
|
|
|
|
|
if (wakenUp.get()) {
|
|
|
|
selector.wakeup();
|
|
|
|
}
|
|
|
|
|
|
|
|
cancelledKeys = 0;
|
|
|
|
runAllTasks();
|
|
|
|
processSelectedKeys();
|
|
|
|
|
|
|
|
if (isShutdown()) {
|
|
|
|
closeAll();
|
|
|
|
if (peekTask() == null) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (Throwable t) {
|
|
|
|
logger.warn(
|
|
|
|
"Unexpected exception in the selector loop.", t);
|
|
|
|
|
|
|
|
// Prevent possible consecutive immediate failures that lead to
|
|
|
|
// excessive CPU consumption.
|
|
|
|
try {
|
|
|
|
Thread.sleep(1000);
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// Ignore.
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2012-05-26 00:51:22 +02:00
|
|
|
}
|
|
|
|
|
2012-08-10 13:17:18 +02:00
|
|
|
@Override
|
|
|
|
protected void cleanup() {
|
|
|
|
try {
|
|
|
|
selector.close();
|
|
|
|
} catch (IOException e) {
|
2012-10-25 02:09:40 +02:00
|
|
|
logger.warn("Failed to close a selector.", e);
|
2012-08-10 13:17:18 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void cancel(SelectionKey key) {
|
|
|
|
key.cancel();
|
|
|
|
cancelledKeys ++;
|
|
|
|
if (cancelledKeys >= CLEANUP_INTERVAL) {
|
|
|
|
cancelledKeys = 0;
|
|
|
|
cleanedCancelledKeys = true;
|
|
|
|
SelectorUtil.cleanupKeys(selector);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private void processSelectedKeys() {
|
|
|
|
Set<SelectionKey> selectedKeys = selector.selectedKeys();
|
2012-09-13 10:25:59 +02:00
|
|
|
// check if the set is empty and if so just return to not create garbage by
|
|
|
|
// creating a new Iterator every time even if there is nothing to process.
|
|
|
|
// See https://github.com/netty/netty/issues/597
|
2012-08-10 13:17:18 +02:00
|
|
|
if (selectedKeys.isEmpty()) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
Iterator<SelectionKey> i;
|
|
|
|
cleanedCancelledKeys = false;
|
|
|
|
boolean clearSelectedKeys = true;
|
|
|
|
try {
|
|
|
|
for (i = selectedKeys.iterator(); i.hasNext();) {
|
|
|
|
final SelectionKey k = i.next();
|
2012-10-25 02:08:11 +02:00
|
|
|
final Object a = k.attachment();
|
|
|
|
if (a instanceof AbstractNioChannel) {
|
|
|
|
processSelectedKey(k, (AbstractNioChannel) a);
|
|
|
|
} else {
|
|
|
|
processSelectedKey(k, (NioTask<SelectableChannel>) a);
|
2012-08-10 13:17:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
if (cleanedCancelledKeys) {
|
|
|
|
// Create the iterator again to avoid ConcurrentModificationException
|
|
|
|
if (selectedKeys.isEmpty()) {
|
|
|
|
clearSelectedKeys = false;
|
|
|
|
break;
|
|
|
|
} else {
|
|
|
|
i = selectedKeys.iterator();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
if (clearSelectedKeys) {
|
|
|
|
selectedKeys.clear();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-10-25 02:08:11 +02:00
|
|
|
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
|
|
|
|
final NioUnsafe unsafe = ch.unsafe();
|
|
|
|
try {
|
|
|
|
int readyOps = k.readyOps();
|
|
|
|
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
|
|
|
|
unsafe.read();
|
|
|
|
if (!ch.isOpen()) {
|
|
|
|
// Connection already closed - no need to handle write.
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
|
|
|
|
unsafe.flushNow();
|
|
|
|
}
|
|
|
|
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
|
|
|
|
unsafe.finishConnect();
|
|
|
|
}
|
|
|
|
} catch (CancelledKeyException ignored) {
|
|
|
|
unsafe.close(unsafe.voidFuture());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
|
|
|
|
boolean success = false;
|
|
|
|
try {
|
|
|
|
task.channelReady(k.channel(), k);
|
|
|
|
success = true;
|
|
|
|
} catch (Exception e) {
|
|
|
|
logger.warn("Unexpected exception while running NioTask.channelReady() - cancelling the key", e);
|
|
|
|
} finally {
|
|
|
|
if (!success) {
|
|
|
|
k.cancel();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!k.isValid()) { // Either cancelled by channelReady() or by this method.
|
|
|
|
invokeChannelUnregistered(task, k);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-08-10 13:17:18 +02:00
|
|
|
private void closeAll() {
|
|
|
|
SelectorUtil.cleanupKeys(selector);
|
|
|
|
Set<SelectionKey> keys = selector.keys();
|
|
|
|
Collection<Channel> channels = new ArrayList<Channel>(keys.size());
|
|
|
|
for (SelectionKey k: keys) {
|
2012-10-25 02:08:11 +02:00
|
|
|
Object a = k.attachment();
|
|
|
|
if (a instanceof Channel) {
|
|
|
|
channels.add((Channel) a);
|
|
|
|
} else {
|
|
|
|
k.cancel();
|
|
|
|
invokeChannelUnregistered((NioTask<SelectableChannel>) a, k);
|
|
|
|
}
|
2012-08-10 13:17:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
for (Channel ch: channels) {
|
|
|
|
ch.unsafe().close(ch.unsafe().voidFuture());
|
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
2012-05-26 00:51:22 +02:00
|
|
|
|
2012-10-25 02:08:11 +02:00
|
|
|
private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k) {
|
|
|
|
try {
|
|
|
|
task.channelUnregistered(k.channel());
|
|
|
|
} catch (Exception e) {
|
|
|
|
logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-06-02 02:51:19 +02:00
|
|
|
@Override
|
2012-08-10 13:17:18 +02:00
|
|
|
protected void wakeup(boolean inEventLoop) {
|
|
|
|
if (wakenUp.compareAndSet(false, true)) {
|
|
|
|
selector.wakeup();
|
|
|
|
}
|
2012-05-26 00:51:22 +02:00
|
|
|
}
|
|
|
|
}
|