[#681] Allow a user to access the Selector of an EventLoop

- Make NioEventLoop public so that a user can downcast it
- Add NioEventLoop.register()
- Add NioTask to let user specify what to do on select()
This commit is contained in:
Trustin Lee 2012-10-24 17:08:11 -07:00
parent 63d3210cff
commit abd37dacd6
2 changed files with 162 additions and 48 deletions

View File

@ -18,6 +18,7 @@ package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.channel.EventLoopException;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe;
import io.netty.logging.InternalLogger;
@ -25,7 +26,6 @@ import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* {@link Selector} and so does the multi-plexing of these in the event loop.
*
*/
final class NioEventLoop extends SingleThreadEventLoop {
public final class NioEventLoop extends SingleThreadEventLoop {
/**
* Internal Netty logger.
@ -97,39 +97,76 @@ final class NioEventLoop extends SingleThreadEventLoop {
return new ConcurrentLinkedQueue<Runnable>();
}
/**
* Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
* of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
* be executed by this event loop when the {@link SelectableChannel} is ready.
*/
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
if (ch == null) {
throw new NullPointerException("ch");
}
if (interestOps == 0) {
throw new IllegalArgumentException("interestOps must be non-zero.");
}
if ((interestOps & ~ch.validOps()) != 0) {
throw new IllegalArgumentException(
"invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ")");
}
if (task == null) {
throw new NullPointerException("task");
}
if (isShutdown()) {
throw new IllegalStateException("event loop shut down");
}
try {
ch.register(selector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
}
}
// Create a new selector and "transfer" all channels from the old
// selector to the new one
private Selector recreateSelector() {
Selector newSelector = openSelector();
Selector selector = this.selector;
this.selector = newSelector;
final Selector newSelector = openSelector();
final Selector oldSelector = this.selector;
// loop over all the keys that are registered with the old Selector
// and register them with the new one
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
int ops = key.interestOps();
Object att = key.attachment();
// cancel the old key
cancel(key);
try {
// register the channel with the new selector now
ch.register(newSelector, ops, att);
} catch (ClosedChannelException e) {
// close channel
AbstractNioChannel channel = (AbstractNioChannel) att;
channel.unsafe().close(channel.unsafe().voidFuture());
// Register all channels to the new Selector.
boolean success = false;
try {
for (SelectionKey key: oldSelector.keys()) {
key.channel().register(newSelector, key.interestOps(), key.attachment());
}
success = true;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
} finally {
if (!success) {
try {
newSelector.close();
} catch (Exception e) {
logger.warn("Failed to close the new Selector.", e);
}
}
}
try {
// time to close the old selector as everything else is registered to the new one
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
if (!success) {
// Keep using the old Selector on failure.
return oldSelector;
}
logger.warn("Recreated Selector because of possible jdk epoll(..) bug");
return newSelector;
// Registration to the new Selector is done. Close the old Selector to cancel all old keys.
try {
selector.close();
} catch (Exception e) {
logger.warn("Failed to close the old selector.", e);
}
logger.info("Selector migration complete.");
return this.selector = newSelector;
}
@Override
@ -266,25 +303,11 @@ final class NioEventLoop extends SingleThreadEventLoop {
try {
for (i = selectedKeys.iterator(); i.hasNext();) {
final SelectionKey k = i.next();
final AbstractNioChannel ch = (AbstractNioChannel) k.attachment();
final NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
unsafe.flushNow();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidFuture());
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
processSelectedKey(k, (NioTask<SelectableChannel>) a);
}
if (cleanedCancelledKeys) {
@ -304,12 +327,58 @@ final class NioEventLoop extends SingleThreadEventLoop {
}
}
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
unsafe.flushNow();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidFuture());
}
}
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
boolean success = false;
try {
task.channelReady(k.channel(), k);
success = true;
} catch (Exception e) {
logger.warn("Unexpected exception while running NioTask.channelReady() - cancelling the key", e);
} finally {
if (!success) {
k.cancel();
}
if (!k.isValid()) { // Either cancelled by channelReady() or by this method.
invokeChannelUnregistered(task, k);
}
}
}
private void closeAll() {
SelectorUtil.cleanupKeys(selector);
Set<SelectionKey> keys = selector.keys();
Collection<Channel> channels = new ArrayList<Channel>(keys.size());
for (SelectionKey k: keys) {
channels.add((Channel) k.attachment());
Object a = k.attachment();
if (a instanceof Channel) {
channels.add((Channel) a);
} else {
k.cancel();
invokeChannelUnregistered((NioTask<SelectableChannel>) a, k);
}
}
for (Channel ch: channels) {
@ -317,6 +386,14 @@ final class NioEventLoop extends SingleThreadEventLoop {
}
}
private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k) {
try {
task.channelUnregistered(k.channel());
} catch (Exception e) {
logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (wakenUp.compareAndSet(false, true)) {

View File

@ -0,0 +1,37 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
/**
* An arbitrary task that can be executed by {@link NioEventLoop} when a {@link SelectableChannel} becomes ready.
*
* @see NioEventLoop#register(SelectableChannel, int, NioTask)
*/
public interface NioTask<C extends SelectableChannel> {
/**
* Invoked when the {@link SelectableChannel} has been selected by the {@link java.nio.channels.Selector}.
*/
void channelReady(C ch, SelectionKey key) throws Exception;
/**
* Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus
* this {@link NioTask} will not be notified anymore.
*/
void channelUnregistered(C ch) throws Exception;
}