diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java index 113db6d8b9..0c697d8c5b 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioEventLoop.java @@ -18,6 +18,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelTaskScheduler; +import io.netty.channel.EventLoopException; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.socket.nio.AbstractNioChannel.NioUnsafe; import io.netty.logging.InternalLogger; @@ -25,7 +26,6 @@ import io.netty.logging.InternalLoggerFactory; import java.io.IOException; import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * {@link Selector} and so does the multi-plexing of these in the event loop. * */ -final class NioEventLoop extends SingleThreadEventLoop { +public final class NioEventLoop extends SingleThreadEventLoop { /** * Internal Netty logger. @@ -97,39 +97,76 @@ final class NioEventLoop extends SingleThreadEventLoop { return new ConcurrentLinkedQueue(); } + /** + * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector} + * of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will + * be executed by this event loop when the {@link SelectableChannel} is ready. + */ + public void register(final SelectableChannel ch, final int interestOps, final NioTask task) { + if (ch == null) { + throw new NullPointerException("ch"); + } + if (interestOps == 0) { + throw new IllegalArgumentException("interestOps must be non-zero."); + } + if ((interestOps & ~ch.validOps()) != 0) { + throw new IllegalArgumentException( + "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ")"); + } + if (task == null) { + throw new NullPointerException("task"); + } + + if (isShutdown()) { + throw new IllegalStateException("event loop shut down"); + } + + try { + ch.register(selector, interestOps, task); + } catch (Exception e) { + throw new EventLoopException("failed to register a channel", e); + } + } + // Create a new selector and "transfer" all channels from the old // selector to the new one private Selector recreateSelector() { - Selector newSelector = openSelector(); - Selector selector = this.selector; - this.selector = newSelector; + final Selector newSelector = openSelector(); + final Selector oldSelector = this.selector; - // loop over all the keys that are registered with the old Selector - // and register them with the new one - for (SelectionKey key: selector.keys()) { - SelectableChannel ch = key.channel(); - int ops = key.interestOps(); - Object att = key.attachment(); - // cancel the old key - cancel(key); - - try { - // register the channel with the new selector now - ch.register(newSelector, ops, att); - } catch (ClosedChannelException e) { - // close channel - AbstractNioChannel channel = (AbstractNioChannel) att; - channel.unsafe().close(channel.unsafe().voidFuture()); + // Register all channels to the new Selector. + boolean success = false; + try { + for (SelectionKey key: oldSelector.keys()) { + key.channel().register(newSelector, key.interestOps(), key.attachment()); + } + success = true; + } catch (Exception e) { + logger.warn("Failed to re-register a Channel to the new Selector.", e); + } finally { + if (!success) { + try { + newSelector.close(); + } catch (Exception e) { + logger.warn("Failed to close the new Selector.", e); + } } } - try { - // time to close the old selector as everything else is registered to the new one - selector.close(); - } catch (Throwable t) { - logger.warn("Failed to close a selector.", t); + + if (!success) { + // Keep using the old Selector on failure. + return oldSelector; } - logger.warn("Recreated Selector because of possible jdk epoll(..) bug"); - return newSelector; + + // Registration to the new Selector is done. Close the old Selector to cancel all old keys. + try { + selector.close(); + } catch (Exception e) { + logger.warn("Failed to close the old selector.", e); + } + + logger.info("Selector migration complete."); + return this.selector = newSelector; } @Override @@ -266,25 +303,11 @@ final class NioEventLoop extends SingleThreadEventLoop { try { for (i = selectedKeys.iterator(); i.hasNext();) { final SelectionKey k = i.next(); - final AbstractNioChannel ch = (AbstractNioChannel) k.attachment(); - final NioUnsafe unsafe = ch.unsafe(); - try { - int readyOps = k.readyOps(); - if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { - unsafe.read(); - if (!ch.isOpen()) { - // Connection already closed - no need to handle write. - continue; - } - } - if ((readyOps & SelectionKey.OP_WRITE) != 0) { - unsafe.flushNow(); - } - if ((readyOps & SelectionKey.OP_CONNECT) != 0) { - unsafe.finishConnect(); - } - } catch (CancelledKeyException ignored) { - unsafe.close(unsafe.voidFuture()); + final Object a = k.attachment(); + if (a instanceof AbstractNioChannel) { + processSelectedKey(k, (AbstractNioChannel) a); + } else { + processSelectedKey(k, (NioTask) a); } if (cleanedCancelledKeys) { @@ -304,12 +327,58 @@ final class NioEventLoop extends SingleThreadEventLoop { } } + private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { + final NioUnsafe unsafe = ch.unsafe(); + try { + int readyOps = k.readyOps(); + if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { + unsafe.read(); + if (!ch.isOpen()) { + // Connection already closed - no need to handle write. + return; + } + } + if ((readyOps & SelectionKey.OP_WRITE) != 0) { + unsafe.flushNow(); + } + if ((readyOps & SelectionKey.OP_CONNECT) != 0) { + unsafe.finishConnect(); + } + } catch (CancelledKeyException ignored) { + unsafe.close(unsafe.voidFuture()); + } + } + + private static void processSelectedKey(SelectionKey k, NioTask task) { + boolean success = false; + try { + task.channelReady(k.channel(), k); + success = true; + } catch (Exception e) { + logger.warn("Unexpected exception while running NioTask.channelReady() - cancelling the key", e); + } finally { + if (!success) { + k.cancel(); + } + + if (!k.isValid()) { // Either cancelled by channelReady() or by this method. + invokeChannelUnregistered(task, k); + } + } + } + private void closeAll() { SelectorUtil.cleanupKeys(selector); Set keys = selector.keys(); Collection channels = new ArrayList(keys.size()); for (SelectionKey k: keys) { - channels.add((Channel) k.attachment()); + Object a = k.attachment(); + if (a instanceof Channel) { + channels.add((Channel) a); + } else { + k.cancel(); + invokeChannelUnregistered((NioTask) a, k); + } } for (Channel ch: channels) { @@ -317,6 +386,14 @@ final class NioEventLoop extends SingleThreadEventLoop { } } + private static void invokeChannelUnregistered(NioTask task, SelectionKey k) { + try { + task.channelUnregistered(k.channel()); + } catch (Exception e) { + logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e); + } + } + @Override protected void wakeup(boolean inEventLoop) { if (wakenUp.compareAndSet(false, true)) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioTask.java b/transport/src/main/java/io/netty/channel/socket/nio/NioTask.java new file mode 100644 index 0000000000..e46e951389 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioTask.java @@ -0,0 +1,37 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket.nio; + +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; + +/** + * An arbitrary task that can be executed by {@link NioEventLoop} when a {@link SelectableChannel} becomes ready. + * + * @see NioEventLoop#register(SelectableChannel, int, NioTask) + */ +public interface NioTask { + /** + * Invoked when the {@link SelectableChannel} has been selected by the {@link java.nio.channels.Selector}. + */ + void channelReady(C ch, SelectionKey key) throws Exception; + + /** + * Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus + * this {@link NioTask} will not be notified anymore. + */ + void channelUnregistered(C ch) throws Exception; +}