diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 99a17d414b..f8a9e98e68 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -62,6 +62,11 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl return inEventLoop(Thread.currentThread()); } + @Override + public EventExecutor unwrap() { + return this; + } + @Override public Future shutdownGracefully() { return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index f8295b7a58..cd45b97bd0 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -44,7 +44,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { CANCELLATION_CAUSE_HOLDER.cause.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); } - private final EventExecutor executor; + EventExecutor executor; private volatile Object result; diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutor.java b/common/src/main/java/io/netty/util/concurrent/EventExecutor.java index 96fb747987..6545c9b97b 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutor.java @@ -54,6 +54,25 @@ public interface EventExecutor extends EventExecutorGroup { */ boolean inEventLoop(Thread thread); + /** + * Returns an {@link EventExecutor} that is not a {@link WrappedEventExecutor}. + * + *
    + *
  • + * A {@link WrappedEventExecutor} implementing this method must return the underlying + * {@link EventExecutor} while making sure that it's not a {@link WrappedEventExecutor} + * (e.g. by multiple calls to {@link #unwrap()}). + *
  • + *
  • + * An {@link EventExecutor} that is not a {@link WrappedEventExecutor} must return a reference to itself. + *
  • + *
  • + * This method must not return null. + *
  • + *
+ */ + EventExecutor unwrap(); + /** * Return a new {@link Promise}. */ diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index b9227d7913..d2ea29e3d9 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -233,8 +233,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor { throw new IllegalArgumentException( String.format("delay: %d (expected: >= 0)", delay)); } - return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); + return schedule(new ScheduledFutureTask(this, delayedTaskQueue, + Executors.callable(command, null), ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index 4c471650e4..f568ea032f 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -37,10 +37,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto /** * @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance. - * If {@code executor} is {@code null} this number will also be the parallelism - * requested from the default executor. It is generally advised for the number - * of {@link EventExecutor}s and the number of {@link Thread}s used by the - * {@code executor} to lie very close together. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventExecutor}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. * @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ @@ -50,10 +50,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto /** * @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance. - * If {@code executor} is {@code null} this number will also be the parallelism - * requested from the default executor. It is generally advised for the number - * of {@link EventExecutor}s and the number of {@link Thread}s used by the - * {@code executor} to lie very close together. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventExecutor}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. * @param executor the {@link Executor} to use, or {@code null} if the default should be used. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ diff --git a/common/src/main/java/io/netty/util/concurrent/PausableEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/PausableEventExecutor.java new file mode 100644 index 0000000000..0906a097a7 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/PausableEventExecutor.java @@ -0,0 +1,41 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import java.util.concurrent.RejectedExecutionException; + +/** + * Implement this interface if you need your {@link EventExecutor} implementation to be able + * to reject new work. + */ +public interface PausableEventExecutor extends EventExecutor, WrappedEventExecutor { + + /** + * After a call to this method the {@link EventExecutor} may throw a {@link RejectedExecutionException} when + * attempting to assign new work to it (i.e. through a call to {@link EventExecutor#execute(Runnable)}). + */ + void rejectNewTasks(); + + /** + * With a call to this method the {@link EventExecutor} signals that it is now accepting new work. + */ + void acceptNewTasks(); + + /** + * Returns {@code true} if and only if this {@link EventExecutor} is accepting a new task. + */ + boolean isAcceptingNewTasks(); +} diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index c7e4d3eafa..25541b1c57 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -16,6 +16,9 @@ package io.netty.util.concurrent; +import io.netty.util.internal.CallableEventExecutorAdapter; +import io.netty.util.internal.OneTimeTask; + import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; @@ -36,36 +39,26 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu } private final long id = nextTaskId.getAndIncrement(); - private final Queue> delayedTaskQueue; + private volatile Queue> delayedTaskQueue; private long deadlineNanos; /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ private final long periodNanos; - ScheduledFutureTask( - EventExecutor executor, Queue> delayedTaskQueue, - Runnable runnable, V result, long nanoTime) { - - this(executor, delayedTaskQueue, toCallable(runnable, result), nanoTime); - } - - ScheduledFutureTask( - EventExecutor executor, Queue> delayedTaskQueue, - Callable callable, long nanoTime, long period) { - - super(executor, callable); + ScheduledFutureTask(EventExecutor executor, Queue> delayedTaskQueue, + Callable callable, long nanoTime, long period) { + super(executor.unwrap(), callable); if (period == 0) { throw new IllegalArgumentException("period: 0 (expected: != 0)"); } + this.delayedTaskQueue = delayedTaskQueue; deadlineNanos = nanoTime; periodNanos = period; } - ScheduledFutureTask( - EventExecutor executor, Queue> delayedTaskQueue, - Callable callable, long nanoTime) { - - super(executor, callable); + ScheduledFutureTask(EventExecutor executor, Queue> delayedTaskQueue, + Callable callable, long nanoTime) { + super(executor.unwrap(), callable); this.delayedTaskQueue = delayedTaskQueue; deadlineNanos = nanoTime; periodNanos = 0; @@ -73,7 +66,7 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu @Override protected EventExecutor executor() { - return super.executor(); + return executor; } public long deadlineNanos() { @@ -117,25 +110,41 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu @Override public void run() { assert executor().inEventLoop(); + try { - if (periodNanos == 0) { - if (setUncancellableInternal()) { - V result = task.call(); - setSuccessInternal(result); + if (isMigrationPending()) { + scheduleWithNewExecutor(); + } else if (needsLaterExecution()) { + if (!executor().isShutdown()) { + // Try again in ten microseconds. + deadlineNanos = nanoTime() + TimeUnit.MICROSECONDS.toNanos(10); + if (!isCancelled()) { + delayedTaskQueue.add(this); + } } } else { - // check if is done as it may was cancelled - if (!isCancelled()) { - task.call(); - if (!executor().isShutdown()) { - long p = periodNanos; - if (p > 0) { - deadlineNanos += p; - } else { - deadlineNanos = nanoTime() - p; - } - if (!isCancelled()) { - delayedTaskQueue.add(this); + // delayed tasks executed once + if (periodNanos == 0) { + if (setUncancellableInternal()) { + V result = task.call(); + setSuccessInternal(result); + } + // periodically executed tasks + } else { + if (!isCancelled()) { + task.call(); + if (!executor().isShutdown()) { + // repeat task at fixed rate + if (periodNanos > 0) { + deadlineNanos += periodNanos; + // repeat task with fixed delay + } else { + deadlineNanos = nanoTime() - periodNanos; + } + + if (!isCancelled()) { + delayedTaskQueue.add(this); + } } } } @@ -158,4 +167,48 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu buf.append(')'); return buf; } + + /** + * When this condition is met it usually means that the channel associated with this task + * was deregistered from its eventloop and has not yet been registered with another eventloop. + */ + private boolean needsLaterExecution() { + return task instanceof CallableEventExecutorAdapter && + ((CallableEventExecutorAdapter) task).executor() instanceof PausableEventExecutor && + !((PausableEventExecutor) ((CallableEventExecutorAdapter) task).executor()).isAcceptingNewTasks(); + } + + /** + * When this condition is met it usually means that the channel associated with this task + * was re-registered with another eventloop, after having been deregistered beforehand. + */ + private boolean isMigrationPending() { + return !isCancelled() && + task instanceof CallableEventExecutorAdapter && + executor() != ((CallableEventExecutorAdapter) task).executor().unwrap(); + } + + private void scheduleWithNewExecutor() { + EventExecutor newExecutor = ((CallableEventExecutorAdapter) task).executor().unwrap(); + + if (newExecutor instanceof SingleThreadEventExecutor) { + if (!newExecutor.isShutdown()) { + executor = newExecutor; + delayedTaskQueue = ((SingleThreadEventExecutor) newExecutor).delayedTaskQueue; + + executor.execute(new OneTimeTask() { + @Override + public void run() { + // Execute as soon as possible. + deadlineNanos = nanoTime(); + if (!isCancelled()) { + delayedTaskQueue.add(ScheduledFutureTask.this); + } + } + }); + } + } else { + throw new UnsupportedOperationException("task migration unsupported"); + } + } } diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index c88a395c74..b81b169665 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -15,7 +15,9 @@ */ package io.netty.util.concurrent; +import io.netty.util.internal.CallableEventExecutorAdapter; import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.RunnableEventExecutorAdapter; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -128,7 +130,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it. * @param executor the {@link Executor} which will be used for executing. * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up - * the executor thread. + * the executor thread. */ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { super(parent); @@ -731,7 +733,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { String.format("delay: %d (expected: >= 0)", delay)); } return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); + this, delayedTaskQueue, toCallable(command), ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); } @Override @@ -768,7 +770,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, Executors.callable(command, null), + this, delayedTaskQueue, toCallable(command), ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); } @@ -790,7 +792,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } return schedule(new ScheduledFutureTask( - this, delayedTaskQueue, Executors.callable(command, null), + this, delayedTaskQueue, toCallable(command), ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); } @@ -813,6 +815,14 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return task; } + private static Callable toCallable(final Runnable command) { + if (command instanceof RunnableEventExecutorAdapter) { + return new RunnableToCallableAdapter((RunnableEventExecutorAdapter) command); + } else { + return Executors.callable(command, null); + } + } + protected void cleanupAndTerminate(boolean success) { for (;;) { int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); @@ -865,7 +875,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } } - protected void scheduleExecution() { + protected final void scheduleExecution() { updateThread(null); executor.execute(AS_RUNNABLE); } @@ -886,4 +896,29 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { } } } + + private static class RunnableToCallableAdapter implements CallableEventExecutorAdapter { + + final RunnableEventExecutorAdapter runnable; + + RunnableToCallableAdapter(RunnableEventExecutorAdapter runnable) { + this.runnable = runnable; + } + + @Override + public EventExecutor executor() { + return runnable.executor(); + } + + @Override + public Callable unwrap() { + return null; + } + + @Override + public Void call() throws Exception { + runnable.run(); + return null; + } + } } diff --git a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java b/common/src/main/java/io/netty/util/concurrent/WrappedEventExecutor.java similarity index 51% rename from transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java rename to common/src/main/java/io/netty/util/concurrent/WrappedEventExecutor.java index 979e80ee98..de3d6bc76d 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java +++ b/common/src/main/java/io/netty/util/concurrent/WrappedEventExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2012 The Netty Project + * Copyright 2014 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -13,22 +13,15 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.channel.nio; -import io.netty.channel.AbstractEventLoopTest; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; +package io.netty.util.concurrent; -public class NioEventLoopTest extends AbstractEventLoopTest { +/** + * A marker interface indicating that the {@link EventExecutor} is a wrapper around + * another {@link EventExecutor} implementation. + * + * See {@link EventExecutor#unwrap()} and {@link PausableEventExecutor} for further details. + */ +public interface WrappedEventExecutor extends EventExecutor { - @Override - protected EventLoopGroup newEventLoopGroup() { - return new NioEventLoopGroup(); - } - - @Override - protected Class newChannel() { - return NioServerSocketChannel.class; - } } diff --git a/common/src/main/java/io/netty/util/internal/CallableEventExecutorAdapter.java b/common/src/main/java/io/netty/util/internal/CallableEventExecutorAdapter.java new file mode 100644 index 0000000000..419185de11 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/CallableEventExecutorAdapter.java @@ -0,0 +1,43 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import io.netty.util.concurrent.EventExecutor; + +import java.util.concurrent.Callable; + +/** + * Generic interface to wrap an {@link EventExecutor} and a {@link Callable}. + * + * This interface is used internally to ensure scheduled tasks are executed by + * the correct {@link EventExecutor} after channel migration. + * + * @see io.netty.util.concurrent.ScheduledFutureTask + * @see io.netty.util.concurrent.SingleThreadEventExecutor + * + * @param the result type of method {@link Callable#call()}. + */ +public interface CallableEventExecutorAdapter extends Callable { + /** + * Returns the wrapped {@link EventExecutor}. + */ + EventExecutor executor(); + + /** + * Returns the wrapped {@link Callable}. + */ + Callable unwrap(); +} diff --git a/common/src/main/java/io/netty/util/internal/RunnableEventExecutorAdapter.java b/common/src/main/java/io/netty/util/internal/RunnableEventExecutorAdapter.java new file mode 100644 index 0000000000..264f6ecd07 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/RunnableEventExecutorAdapter.java @@ -0,0 +1,39 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import io.netty.util.concurrent.EventExecutor; + +/** + * Generic interface to wrap an {@link EventExecutor} and a {@link Runnable}. + * + * This interface is used internally to ensure scheduled tasks are executed by + * the correct {@link EventExecutor} after channel migration. + * + * @see io.netty.util.concurrent.ScheduledFutureTask + * @see io.netty.util.concurrent.SingleThreadEventExecutor + */ +public interface RunnableEventExecutorAdapter extends Runnable { + /** + * @return the wrapped {@link EventExecutor}. + */ + EventExecutor executor(); + + /** + * @return the wrapped {@link Runnable}. + */ + Runnable unwrap(); +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 79fef4403c..bbe636561b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -98,7 +98,7 @@ abstract class AbstractEpollChannel extends AbstractChannel { @Override protected void doDeregister() throws Exception { - ((EpollEventLoop) eventLoop()).remove(this); + ((EpollEventLoop) eventLoop().unwrap()).remove(this); } @Override @@ -154,14 +154,13 @@ abstract class AbstractEpollChannel extends AbstractChannel { private void modifyEvents() { if (isOpen()) { - ((EpollEventLoop) eventLoop()).modify(this); + ((EpollEventLoop) eventLoop().unwrap()).modify(this); } } @Override protected void doRegister() throws Exception { - EpollEventLoop loop = (EpollEventLoop) eventLoop(); - loop.add(this); + ((EpollEventLoop) eventLoop().unwrap()).add(this); } @Override diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 8574626c92..cbb84d43a8 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -24,12 +24,12 @@ import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.NotYetConnectedException; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; /** * A skeletal {@link Channel} implementation. @@ -59,7 +59,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private volatile SocketAddress localAddress; private volatile SocketAddress remoteAddress; - private volatile EventLoop eventLoop; + private volatile PausableChannelEventLoop eventLoop; private volatile boolean registered; /** Cache for the string representation of this channel */ @@ -119,7 +119,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public EventLoop eventLoop() { + public final EventLoop eventLoop() { EventLoop eventLoop = this.eventLoop; if (eventLoop == null) { throw new IllegalStateException("channel not registered to an event loop"); @@ -198,6 +198,27 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public ChannelFuture deregister() { + /** + * One problem of channel deregistration is that after a channel has been deregistered + * there may still be tasks, created from within one of the channel's ChannelHandlers, + * in the {@link EventLoop}'s task queue. That way, an unfortunate twist of events could lead + * to tasks still being in the old {@link EventLoop}'s queue even after the channel has been + * registered with a new {@link EventLoop}. This would lead to the tasks being executed by two + * different {@link EventLoop}s. + * + * Our solution to this problem is to always perform the actual deregistration of + * the channel as a task and to reject any submission of new tasks, from within + * one of the channel's ChannelHandlers, until the channel is registered with + * another {@link EventLoop}. That way we can be sure that there are no more tasks regarding + * that particular channel after it has been deregistered (because the deregistration + * task is the last one.). + * + * This only works for one time tasks. To see how we handle periodic/delayed tasks have a look + * at {@link io.netty.util.concurrent.ScheduledFutureTask#run()}. + * + * Also see {@link HeadContext#deregister(ChannelHandlerContext, ChannelPromise)}. + */ + eventLoop.rejectNewTasks(); return pipeline.deregister(); } @@ -234,6 +255,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public ChannelFuture deregister(ChannelPromise promise) { + eventLoop.rejectNewTasks(); return pipeline.deregister(promise); } @@ -401,7 +423,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public final ChannelHandlerInvoker invoker() { - return eventLoop().asInvoker(); + // return the unwrapped invoker. + return ((PausableChannelEventLoop) eventLoop().asInvoker()).unwrapInvoker(); } @Override @@ -424,6 +447,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (eventLoop == null) { throw new NullPointerException("eventLoop"); } + if (promise == null) { + throw new NullPointerException("promise"); + } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; @@ -434,7 +460,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return; } - AbstractChannel.this.eventLoop = eventLoop; + // It's necessary to reuse the wrapped eventloop object. Otherwise the user will end up with multiple + // objects that do not share a common state. + if (AbstractChannel.this.eventLoop == null) { + AbstractChannel.this.eventLoop = new PausableChannelEventLoop(eventLoop); + } else { + AbstractChannel.this.eventLoop.unwrapped = eventLoop; + } if (eventLoop.inEventLoop()) { register0(promise); @@ -466,6 +498,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } doRegister(); registered = true; + AbstractChannel.this.eventLoop.acceptNewTasks(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { @@ -587,17 +620,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); } finally { - if (wasActive && !isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelInactive(); + deregister(voidPromise()); + } + }); + } else { + invokeLater(new OneTimeTask() { + @Override + public void run() { + deregister(voidPromise()); } }); } - - deregister(voidPromise()); } } @@ -610,6 +648,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + /** + * This method must NEVER be called directly, but be executed as an + * extra task with a clean call stack instead. The reason for this + * is that this method calls {@link ChannelPipeline#fireChannelUnregistered()} + * directly, which might lead to an unfortunate nesting of independent inbound/outbound + * events. See the comments in {@link #invokeLater(Runnable)} for more details. + */ @Override public final void deregister(final ChannelPromise promise) { if (!promise.setUncancellable()) { @@ -624,17 +669,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doDeregister(); } catch (Throwable t) { + safeSetFailure(promise, t); logger.warn("Unexpected exception occurred while deregistering a channel.", t); } finally { if (registered) { registered = false; - invokeLater(new OneTimeTask() { - @Override - public void run() { - pipeline.fireChannelUnregistered(); - } - }); safeSetSuccess(promise); + pipeline.fireChannelUnregistered(); } else { // Some transports like local and AIO does not allow the deregistration of // an open channel. Their doDeregister() calls close(). Consequently, @@ -792,7 +833,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha // -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet // // which means the execution of two inbound handler methods of the same handler overlap undesirably. - eventLoop().execute(task); + eventLoop().unwrap().execute(task); } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } @@ -895,4 +936,70 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return super.trySuccess(); } } + + private final class PausableChannelEventLoop + extends PausableChannelEventExecutor implements EventLoop { + + volatile boolean isAcceptingNewTasks = true; + volatile EventLoop unwrapped; + + PausableChannelEventLoop(EventLoop unwrapped) { + this.unwrapped = unwrapped; + } + + @Override + public void rejectNewTasks() { + isAcceptingNewTasks = false; + } + + @Override + public void acceptNewTasks() { + isAcceptingNewTasks = true; + } + + @Override + public boolean isAcceptingNewTasks() { + return isAcceptingNewTasks; + } + + @Override + public EventLoopGroup parent() { + return unwrap().parent(); + } + + @Override + public EventLoop next() { + return unwrap().next(); + } + + @Override + public EventLoop unwrap() { + return unwrapped; + } + + @Override + public ChannelHandlerInvoker asInvoker() { + return this; + } + + @Override + public ChannelFuture register(Channel channel) { + return unwrap().register(channel); + } + + @Override + public ChannelFuture register(Channel channel, ChannelPromise promise) { + return unwrap().register(channel, promise); + } + + @Override + Channel channel() { + return AbstractChannel.this; + } + + @Override + ChannelHandlerInvoker unwrapInvoker() { + return unwrapped.asInvoker(); + } + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index ad0b51ff91..ad914d05ee 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -22,13 +22,22 @@ import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.PausableEventExecutor; +import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; - import java.net.SocketAddress; import java.util.WeakHashMap; -abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { +/** + * Abstract base class for {@link ChannelHandlerContext} implementations. + *

+ * The unusual is-a relationship with {@link PausableChannelEventExecutor} was put in place to avoid + * additional object allocations that would have been required to wrap the return values of {@link #executor()} + * and {@link #invoker()}. + */ +abstract class AbstractChannelHandlerContext + extends PausableChannelEventExecutor implements ChannelHandlerContext, ResourceLeakHint { // This class keeps an integer member field 'skipFlags' whose each bit tells if the corresponding handler method // is annotated with @Skip. 'skipFlags' is retrieved in runtime via the reflection API and is cached. @@ -237,7 +246,11 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R if (executor.inEventLoop()) { teardown0(); } else { - executor.execute(new Runnable() { + /** + * use unwrap(), because the executor will usually be a {@link PausableEventExecutor} + * that might not accept any new tasks. + */ + executor().unwrap().execute(new OneTimeTask() { @Override public void run() { teardown0(); @@ -257,7 +270,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R } @Override - public Channel channel() { + public final Channel channel() { return channel; } @@ -272,16 +285,13 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R } @Override - public EventExecutor executor() { - return invoker().executor(); + public final EventExecutor executor() { + return this; } @Override - public ChannelHandlerInvoker invoker() { - if (invoker == null) { - return channel.unsafe().invoker(); - } - return invoker; + public final ChannelHandlerInvoker invoker() { + return this; } @Override @@ -543,4 +553,36 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R public String toString() { return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel + ')'; } + + @Override + public EventExecutor unwrap() { + return unwrapInvoker().executor(); + } + + @Override + public void rejectNewTasks() { + /** + * This cast is correct because {@link #channel()} always returns an {@link AbstractChannel} and + * {@link AbstractChannel#eventLoop()} always returns a {@link PausableEventExecutor}. + */ + ((PausableEventExecutor) channel().eventLoop()).rejectNewTasks(); + } + + @Override + public void acceptNewTasks() { + ((PausableEventExecutor) channel().eventLoop()).acceptNewTasks(); + } + + @Override + public boolean isAcceptingNewTasks() { + return ((PausableEventExecutor) channel().eventLoop()).isAcceptingNewTasks(); + } + + @Override + ChannelHandlerInvoker unwrapInvoker() { + if (invoker == null) { + return channel().unsafe().invoker(); + } + return invoker; + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractEventLoop.java b/transport/src/main/java/io/netty/channel/AbstractEventLoop.java index 7a39d7e9d9..2869197fc3 100644 --- a/transport/src/main/java/io/netty/channel/AbstractEventLoop.java +++ b/transport/src/main/java/io/netty/channel/AbstractEventLoop.java @@ -38,4 +38,9 @@ public abstract class AbstractEventLoop extends AbstractEventExecutor implements public EventLoop next() { return (EventLoop) super.next(); } + + @Override + public EventLoop unwrap() { + return this; + } } diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index f7218bee48..0e66f714e7 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -24,6 +24,8 @@ import io.netty.channel.socket.SocketChannel; import io.netty.util.AttributeMap; import io.netty.util.concurrent.FutureListener; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -281,7 +283,7 @@ public interface Channel extends AttributeMap, Comparable { ChannelFuture close(); /** - * Request to deregister this {@link Channel} from the previous assigned {@link EventLoop} and notify the + * Request to deregister this {@link Channel} from its assigned {@link EventLoop} and notify the * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of * an error. *

@@ -289,7 +291,19 @@ public interface Channel extends AttributeMap, Comparable { * {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. - * + *

+ * After this method completes (not the {@link ChannelFuture}!) one can not submit new tasks to the + * {@link Channel}'s {@link EventLoop} until the {@link Channel} is again registered with an {@link EventLoop}. + * Any attempt to do so will result in a {@link RejectedExecutionException} being thrown. + * Any tasks that were submitted before the call to {@link #deregister()} will finish before the + * {@link ChannelFuture} completes. Furthermore, periodic and delayed tasks will not be executed until the + * {@link Channel} is registered with an {@link EventLoop} again. Theses are tasks submitted + * to the {@link EventLoop} via one of the methods declared by {@link ScheduledExecutorService}. + * Please note that all of the above only applies to tasks created from within the deregistered {@link Channel}'s + * {@link ChannelHandler}s. + *

+ * It's only safe to {@linkplain EventLoop#register(Channel)} the {@link Channel} with another (or the same) + * {@link EventLoop} after the {@link ChannelFuture} has completed. */ ChannelFuture deregister(); @@ -367,16 +381,27 @@ public interface Channel extends AttributeMap, Comparable { ChannelFuture close(ChannelPromise promise); /** - * Request to deregister this {@link Channel} from the previous assigned {@link EventLoop} and notify the - * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of + * Request to deregister this {@link Channel} from its assigned {@link EventLoop} and notify the + * {@link ChannelPromise} once the operation completes, either because the operation was successful or because of * an error. - * - * The given {@link ChannelPromise} will be notified. *

* This will result in having the * {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. + *

+ * After this method completes (not the {@link ChannelPromise}!) one can not submit new tasks to the + * {@link Channel}'s {@link EventLoop} until the {@link Channel} is again registered with an {@link EventLoop}. + * Any attempt to do so will result in a {@link RejectedExecutionException} being thrown. + * Any tasks that were submitted before the call to {@link #deregister()} will finish before the + * {@link ChannelPromise} completes. Furthermore, periodic and delayed tasks will not be executed until the + * {@link Channel} is registered with an {@link EventLoop} again. Theses are tasks submitted + * to the {@link EventLoop} via one of the methods declared by {@link ScheduledExecutorService}. + * Please note that all of the above only applies to tasks created from within the deregistered {@link Channel}'s + * {@link ChannelHandler}s. + *

+ * It's only safe to {@linkplain EventLoop#register(Channel)} the {@link Channel} with another (or the same) + * {@link EventLoop} after the {@link ChannelPromise} has completed. */ ChannelFuture deregister(ChannelPromise promise); @@ -459,6 +484,11 @@ public interface Channel extends AttributeMap, Comparable { /** * Register the {@link Channel} of the {@link ChannelPromise} and notify * the {@link ChannelFuture} once the registration was complete. + *

+ * It's only safe to submit a new task to the {@link EventLoop} from within a + * {@link ChannelHandler} once the {@link ChannelPromise} succeeded. Otherwise + * the task may or may not be rejected. + *

*/ void register(EventLoop eventLoop, ChannelPromise promise); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 753135e8ca..10b468a7e6 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -19,6 +19,8 @@ import io.netty.channel.Channel.Unsafe; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.PausableEventExecutor; +import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -61,7 +63,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext tail; private final Map name2ctx = - new HashMap(4); + new HashMap(4); /** * @see #findInvoker(EventExecutorGroup) @@ -420,15 +422,15 @@ final class DefaultChannelPipeline implements ChannelPipeline { remove0(ctx); return ctx; } else { - future = ctx.executor().submit(new Runnable() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - remove0(ctx); - } - } - }); - context = ctx; + future = ctx.executor().submit(new Runnable() { + @Override + public void run() { + synchronized (DefaultChannelPipeline.this) { + remove0(ctx); + } + } + }); + context = ctx; } } @@ -620,7 +622,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public void run() { callHandlerRemoved0(ctx); - } + } }); return; } @@ -1188,8 +1190,16 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - unsafe.deregister(promise); + public void deregister(ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { + assert !((PausableEventExecutor) ctx.channel().eventLoop()).isAcceptingNewTasks(); + + // submit deregistration task + ctx.channel().eventLoop().unwrap().execute(new OneTimeTask() { + @Override + public void run() { + unsafe.deregister(promise); + } + }); } @Override diff --git a/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java b/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java index cfb48de3bd..e84413f3ed 100644 --- a/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java @@ -36,10 +36,10 @@ public class DefaultEventLoopGroup extends MultithreadEventLoopGroup { /** * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. - * If {@code executor} is {@code null} this number will also be the parallelism - * requested from the default executor. It is generally advised for the number - * of {@link EventLoop}s and the number of {@link Thread}s used by the - * {@code executor} to lie very close together. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. */ public DefaultEventLoopGroup(int nEventLoops) { this(nEventLoops, (Executor) null); @@ -47,10 +47,10 @@ public class DefaultEventLoopGroup extends MultithreadEventLoopGroup { /** * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. - * If {@code executor} is {@code null} this number will also be the parallelism - * requested from the default executor. It is generally advised for the number - * of {@link EventLoop}s and the number of {@link Thread}s used by the - * {@code executor} to lie very close together. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. * @param executor the {@link Executor} to use, or {@code null} if the default should be used. */ public DefaultEventLoopGroup(int nEventLoops, Executor executor) { @@ -59,10 +59,10 @@ public class DefaultEventLoopGroup extends MultithreadEventLoopGroup { /** * @param nEventLoops the number of {@link EventLoop}s that will be used by this instance. - * If {@code executor} is {@code null} this number will also be the parallelism - * requested from the default executor. It is generally advised for the number - * of {@link EventLoop}s and the number of {@link Thread}s used by the - * {@code executor} to lie very close together. + * If {@code executor} is {@code null} this number will also be the parallelism + * requested from the default executor. It is generally advised for the number + * of {@link EventLoop}s and the number of {@link Thread}s used by the + * {@code executor} to lie very close together. * @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used. */ public DefaultEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory) { diff --git a/transport/src/main/java/io/netty/channel/EventLoop.java b/transport/src/main/java/io/netty/channel/EventLoop.java index de0f760cc9..b8fe89546f 100644 --- a/transport/src/main/java/io/netty/channel/EventLoop.java +++ b/transport/src/main/java/io/netty/channel/EventLoop.java @@ -29,6 +29,9 @@ public interface EventLoop extends EventExecutor, EventLoopGroup { @Override EventLoopGroup parent(); + @Override + EventLoop unwrap(); + /** * Creates a new default {@link ChannelHandlerInvoker} implementation that uses this {@link EventLoop} to * invoke event handler methods. diff --git a/transport/src/main/java/io/netty/channel/EventLoopGroup.java b/transport/src/main/java/io/netty/channel/EventLoopGroup.java index 669e61a175..75ae20680e 100644 --- a/transport/src/main/java/io/netty/channel/EventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/EventLoopGroup.java @@ -27,14 +27,25 @@ public interface EventLoopGroup extends EventExecutorGroup { EventLoop next(); /** - * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture} - * will get notified once the registration was complete. + * Register a {@link Channel} with an {@link EventLoop} from this {@link EventLoopGroup}. The returned + * {@link ChannelFuture} will get notified once the registration is completed. + *

+ * It's only safe to submit a new task to the {@link EventLoop} from within a + * {@link ChannelHandler} once the {@link ChannelPromise} succeeded. Otherwise + * the task may or may not be rejected. + *

*/ ChannelFuture register(Channel channel); /** - * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture} - * will get notified once the registration was complete and also will get returned. + * Register a {@link Channel} with an {@link EventLoop} from this {@link EventLoopGroup}. The provided + * {@link ChannelPromise} will get notified once the registration is completed. The returned {@link ChannelFuture} + * is the same {@link ChannelPromise} that was passed to the method. + *

+ * It's only safe to submit a new task to the {@link EventLoop} from within a + * {@link ChannelHandler} once the {@link ChannelPromise} succeeded. Otherwise + * the task may or may not be rejected. + *

*/ ChannelFuture register(Channel channel, ChannelPromise promise); } diff --git a/transport/src/main/java/io/netty/channel/PausableChannelEventExecutor.java b/transport/src/main/java/io/netty/channel/PausableChannelEventExecutor.java new file mode 100644 index 0000000000..cb9f804180 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/PausableChannelEventExecutor.java @@ -0,0 +1,380 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.util.internal.CallableEventExecutorAdapter; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.PausableEventExecutor; +import io.netty.util.concurrent.ProgressivePromise; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.RunnableEventExecutorAdapter; +import io.netty.util.concurrent.ScheduledFuture; + +import java.net.SocketAddress; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +abstract class PausableChannelEventExecutor implements PausableEventExecutor, ChannelHandlerInvoker { + + abstract Channel channel(); + + abstract ChannelHandlerInvoker unwrapInvoker(); + + @Override + public void invokeFlush(ChannelHandlerContext ctx) { + unwrapInvoker().invokeFlush(ctx); + } + + @Override + public EventExecutor executor() { + return this; + } + + @Override + public void invokeChannelRegistered(ChannelHandlerContext ctx) { + unwrapInvoker().invokeChannelRegistered(ctx); + } + + @Override + public void invokeChannelUnregistered(ChannelHandlerContext ctx) { + unwrapInvoker().invokeChannelUnregistered(ctx); + } + + @Override + public void invokeChannelActive(ChannelHandlerContext ctx) { + unwrapInvoker().invokeChannelActive(ctx); + } + + @Override + public void invokeChannelInactive(ChannelHandlerContext ctx) { + unwrapInvoker().invokeChannelInactive(ctx); + } + + @Override + public void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + unwrapInvoker().invokeExceptionCaught(ctx, cause); + } + + @Override + public void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event) { + unwrapInvoker().invokeUserEventTriggered(ctx, event); + } + + @Override + public void invokeChannelRead(ChannelHandlerContext ctx, Object msg) { + unwrapInvoker().invokeChannelRead(ctx, msg); + } + + @Override + public void invokeChannelReadComplete(ChannelHandlerContext ctx) { + unwrapInvoker().invokeChannelReadComplete(ctx); + } + + @Override + public void invokeChannelWritabilityChanged(ChannelHandlerContext ctx) { + unwrapInvoker().invokeChannelWritabilityChanged(ctx); + } + + @Override + public void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { + unwrapInvoker().invokeBind(ctx, localAddress, promise); + } + + @Override + public void invokeConnect( + ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + unwrapInvoker().invokeConnect(ctx, remoteAddress, localAddress, promise); + } + + @Override + public void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise) { + unwrapInvoker().invokeDisconnect(ctx, promise); + } + + @Override + public void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise) { + unwrapInvoker().invokeClose(ctx, promise); + } + + @Override + public void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise) { + unwrapInvoker().invokeDeregister(ctx, promise); + } + + @Override + public void invokeRead(ChannelHandlerContext ctx) { + unwrapInvoker().invokeRead(ctx); + } + + @Override + public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + unwrapInvoker().invokeWrite(ctx, msg, promise); + } + + @Override + public EventExecutor next() { + return unwrap().next(); + } + + @Override + public Set children() { + return unwrap().children(); + } + + @Override + public EventExecutorGroup parent() { + return unwrap().parent(); + } + + @Override + public boolean inEventLoop() { + return unwrap().inEventLoop(); + } + + @Override + public boolean inEventLoop(Thread thread) { + return unwrap().inEventLoop(thread); + } + + @Override + public Promise newPromise() { + return unwrap().newPromise(); + } + + @Override + public ProgressivePromise newProgressivePromise() { + return unwrap().newProgressivePromise(); + } + + @Override + public Future newSucceededFuture(V result) { + return unwrap().newSucceededFuture(result); + } + + @Override + public Future newFailedFuture(Throwable cause) { + return unwrap().newFailedFuture(cause); + } + + @Override + public boolean isShuttingDown() { + return unwrap().isShuttingDown(); + } + + @Override + public Future shutdownGracefully() { + return unwrap().shutdownGracefully(); + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + return unwrap().shutdownGracefully(quietPeriod, timeout, unit); + } + + @Override + public Future terminationFuture() { + return unwrap().terminationFuture(); + } + + @Override + @Deprecated + public void shutdown() { + unwrap().terminationFuture(); + } + + @Override + @Deprecated + public List shutdownNow() { + return unwrap().shutdownNow(); + } + + @Override + public Future submit(Runnable task) { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + return unwrap().submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + return unwrap().submit(task, result); + } + + @Override + public Future submit(Callable task) { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + return unwrap().submit(task); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + + return unwrap().schedule(new ChannelRunnableEventExecutor(channel(), command), delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + return unwrap().schedule(new ChannelCallableEventExecutor(channel(), callable), delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + return unwrap().scheduleAtFixedRate( + new ChannelRunnableEventExecutor(channel(), command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + return unwrap().scheduleWithFixedDelay( + new ChannelRunnableEventExecutor(channel(), command), initialDelay, delay, unit); + } + + @Override + public boolean isShutdown() { + return unwrap().isShutdown(); + } + + @Override + public boolean isTerminated() { + return unwrap().isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return unwrap().awaitTermination(timeout, unit); + } + + @Override + public List> + invokeAll(Collection> tasks) throws InterruptedException { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + return unwrap().invokeAll(tasks); + } + + @Override + public List> + invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + return unwrap().invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + return unwrap().invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + return unwrap().invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + if (!isAcceptingNewTasks()) { + throw new RejectedExecutionException(); + } + unwrap().execute(command); + } + + private static final class ChannelCallableEventExecutor implements CallableEventExecutorAdapter { + + final Channel channel; + final Callable callable; + + ChannelCallableEventExecutor(Channel channel, Callable callable) { + this.channel = channel; + this.callable = callable; + } + + @Override + public EventExecutor executor() { + return channel.eventLoop(); + } + + @Override + public Callable unwrap() { + return callable; + } + + @Override + public V call() throws Exception { + return callable.call(); + } + } + + private static final class ChannelRunnableEventExecutor implements RunnableEventExecutorAdapter { + + final Channel channel; + final Runnable runnable; + + ChannelRunnableEventExecutor(Channel channel, Runnable runnable) { + this.channel = channel; + this.runnable = runnable; + } + + @Override + public EventExecutor executor() { + return channel.eventLoop(); + } + + @Override + public Runnable unwrap() { + return runnable; + } + + @Override + public void run() { + runnable.run(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 14b80b67ba..b56df1f59e 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.SingleThreadEventExecutor; import java.util.concurrent.Executor; @@ -70,6 +71,11 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im return !(task instanceof NonWakeupRunnable); } + @Override + public EventLoop unwrap() { + return this; + } + /** * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases. */ diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 21bb2c30ab..a159cd7344 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -190,7 +190,7 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle } @Override - public void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise) { + public void invokeDeregister(ChannelHandlerContext ctx, final ChannelPromise promise) { invokeDeregisterNow(ctx, promise); } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index eac78cb7b8..5cc2b99cf3 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -27,7 +27,6 @@ import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventLoop; import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.internal.InternalThreadLocalMap; import java.net.SocketAddress; @@ -182,7 +181,7 @@ public class LocalChannel extends AbstractChannel { } }); } - ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook); + ((SingleThreadEventLoop) eventLoop().unwrap()).addShutdownHook(shutdownHook); } @Override @@ -238,7 +237,7 @@ public class LocalChannel extends AbstractChannel { @Override protected void doDeregister() throws Exception { // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop - ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook); + ((SingleThreadEventLoop) eventLoop().unwrap()).removeShutdownHook(shutdownHook); } @Override diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index f798e56e05..12295ccb97 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -22,7 +22,6 @@ import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.ServerChannel; import io.netty.channel.SingleThreadEventLoop; -import io.netty.util.concurrent.SingleThreadEventExecutor; import java.net.SocketAddress; import java.util.ArrayDeque; @@ -83,7 +82,7 @@ public class LocalServerChannel extends AbstractServerChannel { @Override protected void doRegister() throws Exception { - ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook); + ((SingleThreadEventLoop) eventLoop().unwrap()).addShutdownHook(shutdownHook); } @Override @@ -106,7 +105,7 @@ public class LocalServerChannel extends AbstractServerChannel { @Override protected void doDeregister() throws Exception { - ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook); + ((SingleThreadEventLoop) eventLoop().unwrap()).removeShutdownHook(shutdownHook); } @Override @@ -138,10 +137,10 @@ public class LocalServerChannel extends AbstractServerChannel { serve0(child); } else { eventLoop().execute(new Runnable() { - @Override - public void run() { - serve0(child); - } + @Override + public void run() { + serve0(child); + } }); } return child; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index e61fa94f57..3fed6c7048 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -105,11 +105,6 @@ public abstract class AbstractNioChannel extends AbstractChannel { return ch; } - @Override - public NioEventLoop eventLoop() { - return (NioEventLoop) super.eventLoop(); - } - /** * Return the current {@link SelectionKey} */ @@ -337,13 +332,13 @@ public abstract class AbstractNioChannel extends AbstractChannel { boolean selected = false; for (;;) { try { - selectionKey = javaChannel().register(eventLoop().selector, 0, this); + selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. - eventLoop().selectNow(); + ((NioEventLoop) eventLoop().unwrap()).selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached @@ -356,7 +351,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected void doDeregister() throws Exception { - eventLoop().cancel(selectionKey()); + ((NioEventLoop) eventLoop().unwrap()).cancel(selectionKey()); } @Override diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java index c5b32a8e37..5803448d3a 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java @@ -18,13 +18,13 @@ package io.netty.channel.nio; import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.ExecutorFactory; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; import java.util.concurrent.Executor; -import io.netty.util.concurrent.DefaultExecutorFactory; -import io.netty.util.concurrent.ExecutorFactory; /** * A {@link MultithreadEventLoopGroup} implementation which is used for NIO {@link Selector} based {@link Channel}s. diff --git a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java deleted file mode 100644 index 24b3d7606c..0000000000 --- a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.socket.SocketChannel; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; -import org.junit.Test; - -import static org.junit.Assert.*; - -public abstract class AbstractEventLoopTest { - - /** - * Test for https://github.com/netty/netty/issues/803 - */ - @Test - public void testReregister() { - EventLoopGroup group = newEventLoopGroup(); - EventLoopGroup group2 = newEventLoopGroup(); - final EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(2); - - ServerBootstrap bootstrap = new ServerBootstrap(); - ChannelFuture future = bootstrap.channel(newChannel()).group(group) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - } - }).handler(new ChannelInitializer() { - @Override - public void initChannel(ServerSocketChannel ch) throws Exception { - ch.pipeline().addLast(new TestChannelHandler()); - ch.pipeline().addLast(eventExecutorGroup, new TestChannelHandler2()); - } - }) - .bind(0).awaitUninterruptibly(); - - EventExecutor executor = future.channel().pipeline().context(TestChannelHandler2.class).executor(); - EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler.class).executor(); - future.channel().deregister().awaitUninterruptibly(); - Channel channel = group2.register(future.channel()).awaitUninterruptibly().channel(); - EventExecutor executorNew = channel.pipeline().context(TestChannelHandler.class).executor(); - assertNotSame(executor1, executorNew); - assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor()); - } - - private static final class TestChannelHandler extends ChannelHandlerAdapter { } - - private static final class TestChannelHandler2 extends ChannelHandlerAdapter { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { } - } - - protected abstract EventLoopGroup newEventLoopGroup(); - protected abstract Class newChannel(); -} diff --git a/transport/src/test/java/io/netty/channel/ChannelDeregistrationTest.java b/transport/src/test/java/io/netty/channel/ChannelDeregistrationTest.java new file mode 100644 index 0000000000..3b8ad80587 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/ChannelDeregistrationTest.java @@ -0,0 +1,389 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.NetUtil; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.PausableEventExecutor; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.*; + +/** + * These tests should work with any {@link SingleThreadEventLoop} implementation. We chose the + * {@link io.netty.channel.nio.NioEventLoop} because it's the most commonly used {@link EventLoop}. + */ +public class ChannelDeregistrationTest { + + private static final Runnable NOEXEC = new Runnable() { + @Override + public void run() { + fail(); + } + }; + + private static final Runnable NOOP = new Runnable() { + @Override + public void run() { + } + }; + + /** + * Test deregistration and re-registration of a {@link Channel} within a {@link ChannelHandler}. + * + * The first {@link ChannelHandler} in the {@link ChannelPipeline} deregisters the {@link Channel} in the + * {@linkplain ChannelHandler#channelRead(ChannelHandlerContext, Object)} method, while + * the subsequent {@link ChannelHandler}s make sure that the {@link Channel} really is deregistered. The last + * {@link ChannelHandler} registers the {@link Channel} with a new {@link EventLoop} and triggers a + * {@linkplain ChannelHandler#write(ChannelHandlerContext, Object, ChannelPromise)} event, that is then + * used by all {@link ChannelHandler}s to ensure that the {@link Channel} was correctly registered with the + * new {@link EventLoop}. + * + * Most of the {@link ChannelHandler}s in the pipeline are assigned custom {@link EventExecutorGroup}s. + * It's important to make sure that they are preserved during and after + * {@linkplain io.netty.channel.Channel#deregister()}. + */ + @Test(timeout = 2000) + public void testDeregisterFromDifferentEventExecutorGroup() throws Exception { + final AtomicBoolean handlerExecuted1 = new AtomicBoolean(); + final AtomicBoolean handlerExecuted2 = new AtomicBoolean(); + final AtomicBoolean handlerExecuted3 = new AtomicBoolean(); + final AtomicBoolean handlerExecuted4 = new AtomicBoolean(); + final AtomicBoolean handlerExecuted5 = new AtomicBoolean(); + + final EventLoopGroup group1 = new NioEventLoopGroup(1); + final EventLoopGroup group2 = new NioEventLoopGroup(1); + final EventLoopGroup group3 = new NioEventLoopGroup(1); + + ServerBootstrap serverBootstrap = new ServerBootstrap(); + Channel serverChannel = serverBootstrap.group(group1) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + // Deregister the Channel from the EventLoop group1. + ch.pipeline().addLast(new DeregisterHandler(handlerExecuted1, group1.next(), group2.next())); + // Ensure that the Channel is deregistered from EventLoop group1. Also make + // sure that despite deregistration the ChannelHandler is executed by the + // specified EventLoop. + ch.pipeline().addLast(group2, new ExpectDeregisteredHandler(handlerExecuted2, group2.next())); + ch.pipeline().addLast(group3, new ExpectDeregisteredHandler(handlerExecuted3, group3.next())); + ch.pipeline().addLast(group2, new ExpectDeregisteredHandler(handlerExecuted4, group2.next())); + // Register the Channel with EventLoop group2. + ch.pipeline().addLast(group3, + new ReregisterHandler(handlerExecuted5, group3.next(), group2.next())); + } + }).bind(0).sync().channel(); + SocketAddress address = serverChannel.localAddress(); + Socket s = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) address).getPort()); + // write garbage just so to get channelRead(...) invoked. + s.getOutputStream().write(1); + + while (!(handlerExecuted1.get() && + handlerExecuted2.get() && + handlerExecuted3.get() && + handlerExecuted4.get() && + handlerExecuted5.get())) { + Thread.sleep(10); + } + + s.close(); + serverChannel.close(); + group1.shutdownGracefully(); + group2.shutdownGracefully(); + group3.shutdownGracefully(); + } + + /** + * Make sure the {@link EventLoop} and {@link ChannelHandlerInvoker} accessible from within a + * {@link ChannelHandler} are wrapped by a {@link PausableEventExecutor}. + */ + @Test(timeout = 2000) + public void testWrappedEventLoop() throws Exception { + final AtomicBoolean channelActiveCalled1 = new AtomicBoolean(); + final AtomicBoolean channelActiveCalled2 = new AtomicBoolean(); + final EventLoopGroup group1 = new NioEventLoopGroup(); + final EventLoopGroup group2 = new NioEventLoopGroup(); + + ServerBootstrap serverBootstrap = new ServerBootstrap(); + Channel serverChannel = serverBootstrap.group(group1) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new TestChannelHandler3(channelActiveCalled1)); + ch.pipeline().addLast(group2, new TestChannelHandler4(channelActiveCalled2)); + } + }).bind(0).sync().channel(); + SocketAddress address = serverChannel.localAddress(); + Socket client = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) address).getPort()); + + while (!(channelActiveCalled1.get() && channelActiveCalled2.get())) { + Thread.sleep(10); + } + + client.close(); + serverChannel.close(); + group1.shutdownGracefully(); + group2.shutdownGracefully(); + } + + /** + * Test for https://github.com/netty/netty/issues/803 + */ + @Test + public void testReregister() throws Exception { + final EventLoopGroup group1 = new NioEventLoopGroup(); + final EventLoopGroup group2 = new NioEventLoopGroup(); + final EventExecutorGroup group3 = new DefaultEventExecutorGroup(2); + + ServerBootstrap bootstrap = new ServerBootstrap(); + ChannelFuture future = bootstrap.channel(NioServerSocketChannel.class).group(group1) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + } + }).handler(new ChannelInitializer() { + @Override + public void initChannel(ServerSocketChannel ch) throws Exception { + ch.pipeline().addLast(new TestChannelHandler1()); + ch.pipeline().addLast(group3, new TestChannelHandler2()); + } + }) + .bind(0).awaitUninterruptibly(); + + EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler1.class).executor(); + EventExecutor unwrapped1 = executor1.unwrap(); + EventExecutor executor3 = future.channel().pipeline().context(TestChannelHandler2.class).executor(); + + future.channel().deregister().sync(); + + Channel channel = group2.register(future.channel()).sync().channel(); + EventExecutor executor2 = channel.pipeline().context(TestChannelHandler1.class).executor(); + + // same wrapped executor + assertSame(executor1, executor2); + // different executor under the wrapper + assertNotSame(unwrapped1, executor2.unwrap()); + // executor3 must remain unchanged + assertSame(executor3.unwrap(), future.channel().pipeline() + .context(TestChannelHandler2.class) + .executor() + .unwrap()); + } + + private static final class TestChannelHandler1 extends ChannelHandlerAdapter { } + + private static final class TestChannelHandler2 extends ChannelHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { } + } + + private static final class TestChannelHandler3 extends ChannelHandlerAdapter { + AtomicBoolean channelActiveCalled; + + TestChannelHandler3(AtomicBoolean channelActiveCalled) { + this.channelActiveCalled = channelActiveCalled; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channelActiveCalled.set(true); + + assertTrue(ctx.executor() instanceof PausableEventExecutor); + assertTrue(ctx.channel().eventLoop() instanceof PausableEventExecutor); + assertTrue(ctx.invoker().executor() instanceof PausableEventExecutor); + assertTrue(ctx.channel().eventLoop().asInvoker().executor() instanceof PausableEventExecutor); + assertSame(ctx.executor().unwrap(), ctx.channel().eventLoop().unwrap()); + + super.channelActive(ctx); + } + } + + private static final class TestChannelHandler4 extends ChannelHandlerAdapter { + AtomicBoolean channelActiveCalled; + + TestChannelHandler4(AtomicBoolean channelActiveCalled) { + this.channelActiveCalled = channelActiveCalled; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channelActiveCalled.set(true); + assertTrue(ctx.executor() instanceof PausableEventExecutor); + assertTrue(ctx.channel().eventLoop() instanceof PausableEventExecutor); + assertTrue(ctx.invoker().executor() instanceof PausableEventExecutor); + assertTrue(ctx.channel().eventLoop().asInvoker().executor() instanceof PausableEventExecutor); + + // This is executed by its own invoker, which has to be wrapped by + // a separate PausableEventExecutor. + assertNotSame(ctx.executor(), ctx.channel().eventLoop()); + assertNotSame(ctx.executor().unwrap(), ctx.channel().eventLoop().unwrap()); + + super.channelActive(ctx); + } + } + + private static final class DeregisterHandler extends ChannelHandlerAdapter { + + final AtomicBoolean handlerExecuted; + final EventLoop expectedEventLoop1; + final EventLoop expectedEventLoop2; + + DeregisterHandler(AtomicBoolean handlerExecuted, EventLoop expectedEventLoop1, EventLoop expectedEventLoop2) { + this.handlerExecuted = handlerExecuted; + this.expectedEventLoop1 = expectedEventLoop1; + this.expectedEventLoop2 = expectedEventLoop2; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { + assertSame(expectedEventLoop1, ctx.executor().unwrap()); + assertTrue(ctx.channel().isRegistered()); + ctx.channel().deregister().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + assertFalse(ctx.channel().isRegistered()); + + boolean success = false; + try { + ctx.channel().eventLoop().execute(NOEXEC); + success = true; + } catch (Throwable t) { + assertTrue(t instanceof RejectedExecutionException); + } + assertFalse(success); + + handlerExecuted.set(true); + ctx.fireChannelRead(msg); + } + }); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + assertSame(expectedEventLoop2, ctx.executor().unwrap()); + + assertTrue(ctx.channel().isRegistered()); + ctx.executor().execute(NOOP); + ctx.channel().eventLoop().execute(NOOP); + + promise.setSuccess(); + } + } + + private static final class ExpectDeregisteredHandler extends ChannelHandlerAdapter { + + final AtomicBoolean handlerExecuted; + final EventLoop expectedEventLoop; + + ExpectDeregisteredHandler(AtomicBoolean handlerExecuted, EventLoop expectedEventLoop) { + this.handlerExecuted = handlerExecuted; + this.expectedEventLoop = expectedEventLoop; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + assertSame(expectedEventLoop, ctx.executor().unwrap()); + assertFalse(ctx.channel().isRegistered()); + + boolean success = false; + try { + ctx.channel().eventLoop().execute(NOEXEC); + success = true; + } catch (Throwable t) { + assertTrue(t instanceof RejectedExecutionException); + } + assertFalse(success); + + handlerExecuted.set(true); + super.channelRead(ctx, msg); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + assertSame(expectedEventLoop, ctx.executor().unwrap()); + + assertTrue(ctx.channel().isRegistered()); + ctx.executor().execute(NOOP); + ctx.channel().eventLoop().execute(NOOP); + + super.write(ctx, msg, promise); + } + } + + private static final class ReregisterHandler extends ChannelHandlerAdapter { + final AtomicBoolean handlerExecuted; + final EventLoop expectedEventLoop; + final EventLoop newEventLoop; + + ReregisterHandler(AtomicBoolean handlerExecuted, EventLoop expectedEventLoop, EventLoop newEventLoop) { + this.handlerExecuted = handlerExecuted; + this.expectedEventLoop = expectedEventLoop; + this.newEventLoop = newEventLoop; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + assertSame(expectedEventLoop, ctx.executor().unwrap()); + + assertFalse(ctx.channel().isRegistered()); + newEventLoop.register(ctx.channel()).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + assertTrue(ctx.channel().isRegistered()); + + ctx.executor().execute(NOOP); + ctx.channel().eventLoop().execute(NOOP); + + ctx.write(Unpooled.buffer(), ctx.channel().newPromise()).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + handlerExecuted.set(true); + } + }); + } + }); + + super.channelRead(ctx, msg); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + assertSame(expectedEventLoop, ctx.executor().unwrap()); + + assertTrue(ctx.channel().isRegistered()); + ctx.executor().execute(NOOP); + ctx.channel().eventLoop().execute(NOOP); + + super.write(ctx, msg, promise); + } + } +} diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 5897a83d59..4e54de55bd 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -21,6 +21,7 @@ import ch.qos.logback.core.Appender; import io.netty.channel.local.LocalChannel; import io.netty.util.concurrent.DefaultExecutorFactory; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.PausableEventExecutor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -32,10 +33,12 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -49,7 +52,9 @@ public class SingleThreadEventLoopTest { public void run() { } }; + @SuppressWarnings({ "unused" }) private SingleThreadEventLoopA loopA; + @SuppressWarnings({ "unused" }) private SingleThreadEventLoopB loopB; @Before @@ -176,17 +181,8 @@ public class SingleThreadEventLoopTest { assertTrue(f.cancel(true)); assertEquals(5, timestamps.size()); - // Check if the task was run without a lag. - Long previousTimestamp = null; - for (Long t: timestamps) { - if (previousTimestamp == null) { - previousTimestamp = t; - continue; - } - - assertTrue(t.longValue() - previousTimestamp.longValue() >= TimeUnit.MILLISECONDS.toNanos(90)); - previousTimestamp = t; - } + // Check if the task was run without lag. + verifyTimestampDeltas(timestamps, 90); } @Test @@ -266,17 +262,8 @@ public class SingleThreadEventLoopTest { assertTrue(f.cancel(true)); assertEquals(3, timestamps.size()); - // Check if the task was run without a lag. - Long previousTimestamp = null; - for (Long t: timestamps) { - if (previousTimestamp == null) { - previousTimestamp = t; - continue; - } - - assertTrue(t.longValue() - previousTimestamp.longValue() >= TimeUnit.MILLISECONDS.toNanos(150)); - previousTimestamp = t; - } + // Check if the task was run without lag. + verifyTimestampDeltas(timestamps, TimeUnit.MILLISECONDS.toNanos(150)); } @Test @@ -435,12 +422,276 @@ public class SingleThreadEventLoopTest { assertThat(loopA.isShutdown(), is(true)); } + @Test(timeout = 10000) + public void testScheduledTaskWakeupAfterDeregistration() throws Exception { + int numtasks = 5; + + final List> timestampsPerTask = new ArrayList>(numtasks); + final List scheduledFutures = new ArrayList(numtasks); + + // start the eventloops + loopA.execute(NOOP); + loopB.execute(NOOP); + + LocalChannel channel = new LocalChannel(); + ChannelPromise registerPromise = channel.newPromise(); + channel.unsafe().register(loopA, registerPromise); + registerPromise.sync(); + + for (int i = 0; i < numtasks; i++) { + Queue timestamps = new LinkedBlockingQueue(); + timestampsPerTask.add(timestamps); + scheduledFutures.add(channel.eventLoop() + .scheduleAtFixedRate(new TimestampsRunnable(timestamps), 0, 100, TimeUnit.MILLISECONDS)); + } + + // Each task should be executed every 100ms. + // Will give them additional 50ms to execute ten times. + Thread.sleep(50 + 10 * 100); + + // Deregister must stop future execution of scheduled tasks. + assertTrue(channel.deregister().sync().isSuccess()); + + for (Queue timestamps : timestampsPerTask) { + assertTrue(timestamps.size() >= 10); + verifyTimestampDeltas(timestamps, TimeUnit.MICROSECONDS.toNanos(90)); + timestamps.clear(); + } + + // Because the Channel is deregistered no task must have executed since then. + Thread.sleep(200); + for (Queue timestamps : timestampsPerTask) { + assertTrue(timestamps.isEmpty()); + } + + registerPromise = channel.newPromise(); + channel.unsafe().register(loopB, registerPromise); + registerPromise.sync(); + + // After the channel was registered with another eventloop the scheduled tasks should start executing again. + // Same as above. + Thread.sleep(50 + 10 * 100); + + // Cancel all scheduled tasks. + for (ScheduledFuture f : scheduledFutures) { + assertTrue(f.cancel(true)); + } + + for (Queue timestamps : timestampsPerTask) { + assertTrue(timestamps.size() >= 10); + verifyTimestampDeltas(timestamps, TimeUnit.MICROSECONDS.toNanos(90)); + } + } + + /** + * Runnable that adds the current nano time to a list whenever + * it's executed. + */ + private static class TimestampsRunnable implements Runnable { + + private Queue timestamps; + + TimestampsRunnable(Queue timestamps) { + assertNotNull(timestamps); + this.timestamps = timestamps; + } + + @Override + public void run() { + timestamps.add(System.nanoTime()); + } + } + + @Test(timeout = 10000) + public void testDeregisterRegisterMultipleTimesWithTasks() throws Exception { + final Queue timestamps = new LinkedBlockingQueue(); + // need a final counter, so it can be used in an inner class. + final AtomicInteger i = new AtomicInteger(-1); + + // start the eventloops + loopA.execute(NOOP); + loopB.execute(NOOP); + + LocalChannel channel = new LocalChannel(); + boolean firstRun = true; + ScheduledFuture f = null; + while (i.incrementAndGet() < 4) { + ChannelPromise registerPromise = channel.newPromise(); + channel.unsafe().register(i.intValue() % 2 == 0 ? loopA : loopB, registerPromise); + registerPromise.sync(); + + if (firstRun) { + f = channel.eventLoop().scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + assertTrue((i.intValue() % 2 == 0 ? loopA : loopB).inEventLoop()); + timestamps.add(System.nanoTime()); + } + }, 0, 100, TimeUnit.MILLISECONDS); + firstRun = false; + } + + Thread.sleep(250); + + assertTrue(channel.deregister().sync().isSuccess()); + + assertTrue("was " + timestamps.size(), timestamps.size() >= 2); + verifyTimestampDeltas(timestamps, TimeUnit.MILLISECONDS.toNanos(90)); + timestamps.clear(); + } + + // cancel while the channel is deregistered + assertFalse(channel.isRegistered()); + assertTrue(f.cancel(true)); + assertTrue(timestamps.isEmpty()); + + // register again and check that it's not executed again. + ChannelPromise registerPromise = channel.newPromise(); + channel.unsafe().register(loopA, registerPromise); + registerPromise.sync(); + + Thread.sleep(200); + + assertTrue(timestamps.isEmpty()); + } + + @Test(timeout = 10000) + public void testDeregisterWithScheduleWithFixedDelayTask() throws Exception { + testDeregisterWithPeriodicScheduleTask(PeriodicScheduleMethod.FIXED_DELAY); + } + + @Test(timeout = 10000) + public void testDeregisterWithScheduleAtFixedRateTask() throws Exception { + testDeregisterWithPeriodicScheduleTask(PeriodicScheduleMethod.FIXED_RATE); + } + + private void testDeregisterWithPeriodicScheduleTask(PeriodicScheduleMethod method) throws Exception { + final Queue timestamps = new LinkedBlockingQueue(); + + // start the eventloops + loopA.execute(NOOP); + loopB.execute(NOOP); + + LocalChannel channel = new LocalChannel(); + ChannelPromise registerPromise = channel.newPromise(); + channel.unsafe().register(loopA, registerPromise); + registerPromise.sync(); + + assertThat(channel.eventLoop(), instanceOf(PausableEventExecutor.class)); + assertSame(loopA, channel.eventLoop().unwrap()); + + ScheduledFuture scheduleFuture; + if (PeriodicScheduleMethod.FIXED_RATE.equals(method)) { + scheduleFuture = channel.eventLoop().scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + assertTrue(loopB.inEventLoop()); + timestamps.add(System.nanoTime()); + } + }, 100, 200, TimeUnit.MILLISECONDS); + } else { + scheduleFuture = channel.eventLoop().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + assertTrue(loopB.inEventLoop()); + timestamps.add(System.nanoTime()); + } + }, 100, 200, TimeUnit.MILLISECONDS); + } + + assertTrue(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks()); + ChannelFuture deregisterFuture = channel.deregister(); + assertFalse(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks()); + + assertTrue(deregisterFuture.sync().isSuccess()); + + timestamps.clear(); + Thread.sleep(1000); + + // no scheduled tasks must be executed after deregistration. + assertTrue("size: " + timestamps.size(), timestamps.isEmpty()); + + assertFalse(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks()); + registerPromise = channel.newPromise(); + channel.unsafe().register(loopB, registerPromise); + assertTrue(registerPromise.sync().isSuccess()); + assertTrue(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks()); + + assertThat(channel.eventLoop(), instanceOf(PausableEventExecutor.class)); + assertSame(loopB, channel.eventLoop().unwrap()); + + // 100ms internal delay + 1 second. Should be able to execute 5 tasks in that time. + Thread.sleep(1150); + assertTrue(scheduleFuture.cancel(true)); + + assertTrue("was " + timestamps.size(), timestamps.size() >= 5); + verifyTimestampDeltas(timestamps, TimeUnit.MILLISECONDS.toNanos(190)); + } + + private static enum PeriodicScheduleMethod { + FIXED_RATE, FIXED_DELAY + } + + private static void verifyTimestampDeltas(Queue timestamps, long minDelta) { + assertFalse(timestamps.isEmpty()); + long prev = timestamps.poll(); + for (Long timestamp : timestamps) { + long delta = timestamp - prev; + assertTrue(String.format("delta: %d, minDelta: %d", delta, minDelta), delta >= minDelta); + prev = timestamp; + } + } + + @Test(timeout = 10000) + public void testDeregisterWithScheduledTask() throws Exception { + final AtomicBoolean oneTimeScheduledTaskExecuted = new AtomicBoolean(false); + + // start the eventloops + loopA.execute(NOOP); + loopB.execute(NOOP); + + LocalChannel channel = new LocalChannel(); + ChannelPromise registerPromise = channel.newPromise(); + channel.unsafe().register(loopA, registerPromise); + registerPromise.sync(); + + assertThat(channel.eventLoop(), instanceOf(PausableEventExecutor.class)); + assertSame(loopA, ((PausableEventExecutor) channel.eventLoop()).unwrap()); + + io.netty.util.concurrent.ScheduledFuture scheduleFuture = channel.eventLoop().schedule(new Runnable() { + @Override + public void run() { + oneTimeScheduledTaskExecuted.set(true); + assertTrue(loopB.inEventLoop()); + } + }, 1, TimeUnit.SECONDS); + + assertTrue(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks()); + ChannelFuture deregisterFuture = channel.deregister(); + assertFalse(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks()); + + assertTrue(deregisterFuture.sync().isSuccess()); + + Thread.sleep(1000); + + registerPromise = channel.newPromise(); + assertFalse(oneTimeScheduledTaskExecuted.get()); + channel.unsafe().register(loopB, registerPromise); + registerPromise.sync(); + + assertThat(channel.eventLoop(), instanceOf(PausableEventExecutor.class)); + assertSame(loopB, ((PausableEventExecutor) channel.eventLoop()).unwrap()); + + assertTrue(scheduleFuture.sync().isSuccess()); + assertTrue(oneTimeScheduledTaskExecuted.get()); + } + private static class SingleThreadEventLoopA extends SingleThreadEventLoop { final AtomicInteger cleanedUp = new AtomicInteger(); SingleThreadEventLoopA() { - super(null, new DefaultExecutorFactory("A").newExecutor(1), true); + super(null, Executors.newSingleThreadExecutor(), true); } @Override @@ -470,7 +721,7 @@ public class SingleThreadEventLoopTest { private volatile boolean interrupted; SingleThreadEventLoopB() { - super(null, new DefaultExecutorFactory("B").newExecutor(1), false); + super(null, Executors.newSingleThreadExecutor(), false); } @Override diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java index 713187477c..d25213ea99 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java @@ -55,7 +55,7 @@ public class LocalTransportThreadModelTest2 { serverBootstrap.bind(new LocalAddress(LOCAL_CHANNEL)).sync(); int count = 100; - for (int i = 1; i < count + 1; i ++) { + for (int i = 1; i < count + 1; i++) { Channel ch = clientBootstrap.connect().sync().channel(); // SPIN until we get what we are looking for.