Deregistration of a Channel from an EventLoop

Motivation:

After a channel is created it's usually assigned to an
EventLoop. During the lifetime of a Channel the
EventLoop is then responsible for processing all I/O
and compute tasks of the Channel.

For various reasons (e.g. load balancing) a user might
require the ability for a Channel to be assigned to
another EventLoop during its lifetime.

Modifications:

Introduce under the hood changes that ensure that Netty's
thread model is obeyed during and after the deregistration
of a channel.

Ensure that tasks (one time and periodic) are executed by
the right EventLoop at all times.

Result:

A Channel can be deregistered from one and re-registered with
another EventLoop.
This commit is contained in:
Jakob Buchgraber 2014-08-11 16:01:32 +02:00 committed by Trustin Lee
parent f8bee2e94c
commit 220660e351
31 changed files with 1632 additions and 250 deletions

View File

@ -62,6 +62,11 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
return inEventLoop(Thread.currentThread());
}
@Override
public EventExecutor unwrap() {
return this;
}
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);

View File

@ -44,7 +44,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
CANCELLATION_CAUSE_HOLDER.cause.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}
private final EventExecutor executor;
EventExecutor executor;
private volatile Object result;

View File

@ -54,6 +54,25 @@ public interface EventExecutor extends EventExecutorGroup {
*/
boolean inEventLoop(Thread thread);
/**
* Returns an {@link EventExecutor} that is not a {@link WrappedEventExecutor}.
*
* <ul>
* <li>
* A {@link WrappedEventExecutor} implementing this method must return the underlying
* {@link EventExecutor} while making sure that it's not a {@link WrappedEventExecutor}
* (e.g. by multiple calls to {@link #unwrap()}).
* </li>
* <li>
* An {@link EventExecutor} that is not a {@link WrappedEventExecutor} must return a reference to itself.
* </li>
* <li>
* This method must not return null.
* </li>
* </ul>
*/
EventExecutor unwrap();
/**
* Return a new {@link Promise}.
*/

View File

@ -233,8 +233,8 @@ public final class GlobalEventExecutor extends AbstractEventExecutor {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
return schedule(new ScheduledFutureTask<Void>(this, delayedTaskQueue,
Executors.<Void>callable(command, null), ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
@Override

View File

@ -37,10 +37,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
/**
* @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventExecutor}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventExecutor}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
@ -50,10 +50,10 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
/**
* @param nEventExecutors the number of {@link EventExecutor}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventExecutor}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventExecutor}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executor the {@link Executor} to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/

View File

@ -0,0 +1,41 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.concurrent.RejectedExecutionException;
/**
* Implement this interface if you need your {@link EventExecutor} implementation to be able
* to reject new work.
*/
public interface PausableEventExecutor extends EventExecutor, WrappedEventExecutor {
/**
* After a call to this method the {@link EventExecutor} may throw a {@link RejectedExecutionException} when
* attempting to assign new work to it (i.e. through a call to {@link EventExecutor#execute(Runnable)}).
*/
void rejectNewTasks();
/**
* With a call to this method the {@link EventExecutor} signals that it is now accepting new work.
*/
void acceptNewTasks();
/**
* Returns {@code true} if and only if this {@link EventExecutor} is accepting a new task.
*/
boolean isAcceptingNewTasks();
}

View File

@ -16,6 +16,9 @@
package io.netty.util.concurrent;
import io.netty.util.internal.CallableEventExecutorAdapter;
import io.netty.util.internal.OneTimeTask;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
@ -36,36 +39,26 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
}
private final long id = nextTaskId.getAndIncrement();
private final Queue<ScheduledFutureTask<?>> delayedTaskQueue;
private volatile Queue<ScheduledFutureTask<?>> delayedTaskQueue;
private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
ScheduledFutureTask(
EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
Runnable runnable, V result, long nanoTime) {
this(executor, delayedTaskQueue, toCallable(runnable, result), nanoTime);
}
ScheduledFutureTask(
EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
Callable<V> callable, long nanoTime, long period) {
super(executor, callable);
ScheduledFutureTask(EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
Callable<V> callable, long nanoTime, long period) {
super(executor.unwrap(), callable);
if (period == 0) {
throw new IllegalArgumentException("period: 0 (expected: != 0)");
}
this.delayedTaskQueue = delayedTaskQueue;
deadlineNanos = nanoTime;
periodNanos = period;
}
ScheduledFutureTask(
EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
Callable<V> callable, long nanoTime) {
super(executor, callable);
ScheduledFutureTask(EventExecutor executor, Queue<ScheduledFutureTask<?>> delayedTaskQueue,
Callable<V> callable, long nanoTime) {
super(executor.unwrap(), callable);
this.delayedTaskQueue = delayedTaskQueue;
deadlineNanos = nanoTime;
periodNanos = 0;
@ -73,7 +66,7 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
@Override
protected EventExecutor executor() {
return super.executor();
return executor;
}
public long deadlineNanos() {
@ -117,25 +110,41 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
@Override
public void run() {
assert executor().inEventLoop();
try {
if (periodNanos == 0) {
if (setUncancellableInternal()) {
V result = task.call();
setSuccessInternal(result);
if (isMigrationPending()) {
scheduleWithNewExecutor();
} else if (needsLaterExecution()) {
if (!executor().isShutdown()) {
// Try again in ten microseconds.
deadlineNanos = nanoTime() + TimeUnit.MICROSECONDS.toNanos(10);
if (!isCancelled()) {
delayedTaskQueue.add(this);
}
}
} else {
// check if is done as it may was cancelled
if (!isCancelled()) {
task.call();
if (!executor().isShutdown()) {
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
if (!isCancelled()) {
delayedTaskQueue.add(this);
// delayed tasks executed once
if (periodNanos == 0) {
if (setUncancellableInternal()) {
V result = task.call();
setSuccessInternal(result);
}
// periodically executed tasks
} else {
if (!isCancelled()) {
task.call();
if (!executor().isShutdown()) {
// repeat task at fixed rate
if (periodNanos > 0) {
deadlineNanos += periodNanos;
// repeat task with fixed delay
} else {
deadlineNanos = nanoTime() - periodNanos;
}
if (!isCancelled()) {
delayedTaskQueue.add(this);
}
}
}
}
@ -158,4 +167,48 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
buf.append(')');
return buf;
}
/**
* When this condition is met it usually means that the channel associated with this task
* was deregistered from its eventloop and has not yet been registered with another eventloop.
*/
private boolean needsLaterExecution() {
return task instanceof CallableEventExecutorAdapter &&
((CallableEventExecutorAdapter) task).executor() instanceof PausableEventExecutor &&
!((PausableEventExecutor) ((CallableEventExecutorAdapter) task).executor()).isAcceptingNewTasks();
}
/**
* When this condition is met it usually means that the channel associated with this task
* was re-registered with another eventloop, after having been deregistered beforehand.
*/
private boolean isMigrationPending() {
return !isCancelled() &&
task instanceof CallableEventExecutorAdapter &&
executor() != ((CallableEventExecutorAdapter) task).executor().unwrap();
}
private void scheduleWithNewExecutor() {
EventExecutor newExecutor = ((CallableEventExecutorAdapter) task).executor().unwrap();
if (newExecutor instanceof SingleThreadEventExecutor) {
if (!newExecutor.isShutdown()) {
executor = newExecutor;
delayedTaskQueue = ((SingleThreadEventExecutor) newExecutor).delayedTaskQueue;
executor.execute(new OneTimeTask() {
@Override
public void run() {
// Execute as soon as possible.
deadlineNanos = nanoTime();
if (!isCancelled()) {
delayedTaskQueue.add(ScheduledFutureTask.this);
}
}
});
}
} else {
throw new UnsupportedOperationException("task migration unsupported");
}
}
}

View File

@ -15,7 +15,9 @@
*/
package io.netty.util.concurrent;
import io.netty.util.internal.CallableEventExecutorAdapter;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.RunnableEventExecutorAdapter;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -128,7 +130,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it.
* @param executor the {@link Executor} which will be used for executing.
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up
* the executor thread.
* the executor thread.
*/
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
super(parent);
@ -731,7 +733,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
this, delayedTaskQueue, toCallable(command), ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
@Override
@ -768,7 +770,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
return schedule(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(command, null),
this, delayedTaskQueue, toCallable(command),
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}
@ -790,7 +792,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
return schedule(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(command, null),
this, delayedTaskQueue, toCallable(command),
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
@ -813,6 +815,14 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return task;
}
private static Callable<Void> toCallable(final Runnable command) {
if (command instanceof RunnableEventExecutorAdapter) {
return new RunnableToCallableAdapter((RunnableEventExecutorAdapter) command);
} else {
return Executors.<Void>callable(command, null);
}
}
protected void cleanupAndTerminate(boolean success) {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
@ -865,7 +875,7 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
}
protected void scheduleExecution() {
protected final void scheduleExecution() {
updateThread(null);
executor.execute(AS_RUNNABLE);
}
@ -886,4 +896,29 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
}
}
}
private static class RunnableToCallableAdapter implements CallableEventExecutorAdapter<Void> {
final RunnableEventExecutorAdapter runnable;
RunnableToCallableAdapter(RunnableEventExecutorAdapter runnable) {
this.runnable = runnable;
}
@Override
public EventExecutor executor() {
return runnable.executor();
}
@Override
public Callable unwrap() {
return null;
}
@Override
public Void call() throws Exception {
runnable.run();
return null;
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012 The Netty Project
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
@ -13,22 +13,15 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.channel.AbstractEventLoopTest;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
package io.netty.util.concurrent;
public class NioEventLoopTest extends AbstractEventLoopTest {
/**
* A marker interface indicating that the {@link EventExecutor} is a wrapper around
* another {@link EventExecutor} implementation.
*
* See {@link EventExecutor#unwrap()} and {@link PausableEventExecutor} for further details.
*/
public interface WrappedEventExecutor extends EventExecutor {
@Override
protected EventLoopGroup newEventLoopGroup() {
return new NioEventLoopGroup();
}
@Override
protected Class<? extends ServerSocketChannel> newChannel() {
return NioServerSocketChannel.class;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.Callable;
/**
* Generic interface to wrap an {@link EventExecutor} and a {@link Callable}.
*
* This interface is used internally to ensure scheduled tasks are executed by
* the correct {@link EventExecutor} after channel migration.
*
* @see io.netty.util.concurrent.ScheduledFutureTask
* @see io.netty.util.concurrent.SingleThreadEventExecutor
*
* @param <V> the result type of method {@link Callable#call()}.
*/
public interface CallableEventExecutorAdapter<V> extends Callable<V> {
/**
* Returns the wrapped {@link EventExecutor}.
*/
EventExecutor executor();
/**
* Returns the wrapped {@link Callable}.
*/
Callable<V> unwrap();
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.concurrent.EventExecutor;
/**
* Generic interface to wrap an {@link EventExecutor} and a {@link Runnable}.
*
* This interface is used internally to ensure scheduled tasks are executed by
* the correct {@link EventExecutor} after channel migration.
*
* @see io.netty.util.concurrent.ScheduledFutureTask
* @see io.netty.util.concurrent.SingleThreadEventExecutor
*/
public interface RunnableEventExecutorAdapter extends Runnable {
/**
* @return the wrapped {@link EventExecutor}.
*/
EventExecutor executor();
/**
* @return the wrapped {@link Runnable}.
*/
Runnable unwrap();
}

View File

@ -98,7 +98,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
@Override
protected void doDeregister() throws Exception {
((EpollEventLoop) eventLoop()).remove(this);
((EpollEventLoop) eventLoop().unwrap()).remove(this);
}
@Override
@ -154,14 +154,13 @@ abstract class AbstractEpollChannel extends AbstractChannel {
private void modifyEvents() {
if (isOpen()) {
((EpollEventLoop) eventLoop()).modify(this);
((EpollEventLoop) eventLoop().unwrap()).modify(this);
}
}
@Override
protected void doRegister() throws Exception {
EpollEventLoop loop = (EpollEventLoop) eventLoop();
loop.add(this);
((EpollEventLoop) eventLoop().unwrap()).add(this);
}
@Override

View File

@ -24,12 +24,12 @@ import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A skeletal {@link Channel} implementation.
@ -59,7 +59,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile PausableChannelEventLoop eventLoop;
private volatile boolean registered;
/** Cache for the string representation of this channel */
@ -119,7 +119,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public EventLoop eventLoop() {
public final EventLoop eventLoop() {
EventLoop eventLoop = this.eventLoop;
if (eventLoop == null) {
throw new IllegalStateException("channel not registered to an event loop");
@ -198,6 +198,27 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public ChannelFuture deregister() {
/**
* One problem of channel deregistration is that after a channel has been deregistered
* there may still be tasks, created from within one of the channel's ChannelHandlers,
* in the {@link EventLoop}'s task queue. That way, an unfortunate twist of events could lead
* to tasks still being in the old {@link EventLoop}'s queue even after the channel has been
* registered with a new {@link EventLoop}. This would lead to the tasks being executed by two
* different {@link EventLoop}s.
*
* Our solution to this problem is to always perform the actual deregistration of
* the channel as a task and to reject any submission of new tasks, from within
* one of the channel's ChannelHandlers, until the channel is registered with
* another {@link EventLoop}. That way we can be sure that there are no more tasks regarding
* that particular channel after it has been deregistered (because the deregistration
* task is the last one.).
*
* This only works for one time tasks. To see how we handle periodic/delayed tasks have a look
* at {@link io.netty.util.concurrent.ScheduledFutureTask#run()}.
*
* Also see {@link HeadContext#deregister(ChannelHandlerContext, ChannelPromise)}.
*/
eventLoop.rejectNewTasks();
return pipeline.deregister();
}
@ -234,6 +255,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public ChannelFuture deregister(ChannelPromise promise) {
eventLoop.rejectNewTasks();
return pipeline.deregister(promise);
}
@ -401,7 +423,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public final ChannelHandlerInvoker invoker() {
return eventLoop().asInvoker();
// return the unwrapped invoker.
return ((PausableChannelEventLoop) eventLoop().asInvoker()).unwrapInvoker();
}
@Override
@ -424,6 +447,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (promise == null) {
throw new NullPointerException("promise");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
@ -434,7 +460,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// It's necessary to reuse the wrapped eventloop object. Otherwise the user will end up with multiple
// objects that do not share a common state.
if (AbstractChannel.this.eventLoop == null) {
AbstractChannel.this.eventLoop = new PausableChannelEventLoop(eventLoop);
} else {
AbstractChannel.this.eventLoop.unwrapped = eventLoop;
}
if (eventLoop.inEventLoop()) {
register0(promise);
@ -466,6 +498,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
doRegister();
registered = true;
AbstractChannel.this.eventLoop.acceptNewTasks();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
@ -587,17 +620,22 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally {
if (wasActive && !isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelInactive();
deregister(voidPromise());
}
});
} else {
invokeLater(new OneTimeTask() {
@Override
public void run() {
deregister(voidPromise());
}
});
}
deregister(voidPromise());
}
}
@ -610,6 +648,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
/**
* This method must NEVER be called directly, but be executed as an
* extra task with a clean call stack instead. The reason for this
* is that this method calls {@link ChannelPipeline#fireChannelUnregistered()}
* directly, which might lead to an unfortunate nesting of independent inbound/outbound
* events. See the comments in {@link #invokeLater(Runnable)} for more details.
*/
@Override
public final void deregister(final ChannelPromise promise) {
if (!promise.setUncancellable()) {
@ -624,17 +669,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
try {
doDeregister();
} catch (Throwable t) {
safeSetFailure(promise, t);
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (registered) {
registered = false;
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelUnregistered();
}
});
safeSetSuccess(promise);
pipeline.fireChannelUnregistered();
} else {
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
@ -792,7 +833,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
//
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
eventLoop().execute(task);
eventLoop().unwrap().execute(task);
} catch (RejectedExecutionException e) {
logger.warn("Can't invoke task later as EventLoop rejected it", e);
}
@ -895,4 +936,70 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return super.trySuccess();
}
}
private final class PausableChannelEventLoop
extends PausableChannelEventExecutor implements EventLoop {
volatile boolean isAcceptingNewTasks = true;
volatile EventLoop unwrapped;
PausableChannelEventLoop(EventLoop unwrapped) {
this.unwrapped = unwrapped;
}
@Override
public void rejectNewTasks() {
isAcceptingNewTasks = false;
}
@Override
public void acceptNewTasks() {
isAcceptingNewTasks = true;
}
@Override
public boolean isAcceptingNewTasks() {
return isAcceptingNewTasks;
}
@Override
public EventLoopGroup parent() {
return unwrap().parent();
}
@Override
public EventLoop next() {
return unwrap().next();
}
@Override
public EventLoop unwrap() {
return unwrapped;
}
@Override
public ChannelHandlerInvoker asInvoker() {
return this;
}
@Override
public ChannelFuture register(Channel channel) {
return unwrap().register(channel);
}
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return unwrap().register(channel, promise);
}
@Override
Channel channel() {
return AbstractChannel.this;
}
@Override
ChannelHandlerInvoker unwrapInvoker() {
return unwrapped.asInvoker();
}
}
}

View File

@ -22,13 +22,22 @@ import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PausableEventExecutor;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
import java.util.WeakHashMap;
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
/**
* Abstract base class for {@link ChannelHandlerContext} implementations.
* <p>
* The unusual is-a relationship with {@link PausableChannelEventExecutor} was put in place to avoid
* additional object allocations that would have been required to wrap the return values of {@link #executor()}
* and {@link #invoker()}.
*/
abstract class AbstractChannelHandlerContext
extends PausableChannelEventExecutor implements ChannelHandlerContext, ResourceLeakHint {
// This class keeps an integer member field 'skipFlags' whose each bit tells if the corresponding handler method
// is annotated with @Skip. 'skipFlags' is retrieved in runtime via the reflection API and is cached.
@ -237,7 +246,11 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
if (executor.inEventLoop()) {
teardown0();
} else {
executor.execute(new Runnable() {
/**
* use unwrap(), because the executor will usually be a {@link PausableEventExecutor}
* that might not accept any new tasks.
*/
executor().unwrap().execute(new OneTimeTask() {
@Override
public void run() {
teardown0();
@ -257,7 +270,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
}
@Override
public Channel channel() {
public final Channel channel() {
return channel;
}
@ -272,16 +285,13 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
}
@Override
public EventExecutor executor() {
return invoker().executor();
public final EventExecutor executor() {
return this;
}
@Override
public ChannelHandlerInvoker invoker() {
if (invoker == null) {
return channel.unsafe().invoker();
}
return invoker;
public final ChannelHandlerInvoker invoker() {
return this;
}
@Override
@ -543,4 +553,36 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
public String toString() {
return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel + ')';
}
@Override
public EventExecutor unwrap() {
return unwrapInvoker().executor();
}
@Override
public void rejectNewTasks() {
/**
* This cast is correct because {@link #channel()} always returns an {@link AbstractChannel} and
* {@link AbstractChannel#eventLoop()} always returns a {@link PausableEventExecutor}.
*/
((PausableEventExecutor) channel().eventLoop()).rejectNewTasks();
}
@Override
public void acceptNewTasks() {
((PausableEventExecutor) channel().eventLoop()).acceptNewTasks();
}
@Override
public boolean isAcceptingNewTasks() {
return ((PausableEventExecutor) channel().eventLoop()).isAcceptingNewTasks();
}
@Override
ChannelHandlerInvoker unwrapInvoker() {
if (invoker == null) {
return channel().unsafe().invoker();
}
return invoker;
}
}

View File

@ -38,4 +38,9 @@ public abstract class AbstractEventLoop extends AbstractEventExecutor implements
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public EventLoop unwrap() {
return this;
}
}

View File

@ -24,6 +24,8 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.util.AttributeMap;
import io.netty.util.concurrent.FutureListener;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -281,7 +283,7 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
ChannelFuture close();
/**
* Request to deregister this {@link Channel} from the previous assigned {@link EventLoop} and notify the
* Request to deregister this {@link Channel} from its assigned {@link EventLoop} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* an error.
* <p>
@ -289,7 +291,19 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
* {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*
* <p>
* After this method completes (not the {@link ChannelFuture}!) one can not submit new tasks to the
* {@link Channel}'s {@link EventLoop} until the {@link Channel} is again registered with an {@link EventLoop}.
* Any attempt to do so will result in a {@link RejectedExecutionException} being thrown.
* Any tasks that were submitted before the call to {@link #deregister()} will finish before the
* {@link ChannelFuture} completes. Furthermore, periodic and delayed tasks will not be executed until the
* {@link Channel} is registered with an {@link EventLoop} again. Theses are tasks submitted
* to the {@link EventLoop} via one of the methods declared by {@link ScheduledExecutorService}.
* Please note that all of the above only applies to tasks created from within the deregistered {@link Channel}'s
* {@link ChannelHandler}s.
* <p>
* It's only safe to {@linkplain EventLoop#register(Channel)} the {@link Channel} with another (or the same)
* {@link EventLoop} after the {@link ChannelFuture} has completed.
*/
ChannelFuture deregister();
@ -367,16 +381,27 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
ChannelFuture close(ChannelPromise promise);
/**
* Request to deregister this {@link Channel} from the previous assigned {@link EventLoop} and notify the
* {@link ChannelFuture} once the operation completes, either because the operation was successful or because of
* Request to deregister this {@link Channel} from its assigned {@link EventLoop} and notify the
* {@link ChannelPromise} once the operation completes, either because the operation was successful or because of
* an error.
*
* The given {@link ChannelPromise} will be notified.
* <p>
* This will result in having the
* {@link ChannelHandler#deregister(ChannelHandlerContext, ChannelPromise)}
* method called of the next {@link ChannelHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
* <p>
* After this method completes (not the {@link ChannelPromise}!) one can not submit new tasks to the
* {@link Channel}'s {@link EventLoop} until the {@link Channel} is again registered with an {@link EventLoop}.
* Any attempt to do so will result in a {@link RejectedExecutionException} being thrown.
* Any tasks that were submitted before the call to {@link #deregister()} will finish before the
* {@link ChannelPromise} completes. Furthermore, periodic and delayed tasks will not be executed until the
* {@link Channel} is registered with an {@link EventLoop} again. Theses are tasks submitted
* to the {@link EventLoop} via one of the methods declared by {@link ScheduledExecutorService}.
* Please note that all of the above only applies to tasks created from within the deregistered {@link Channel}'s
* {@link ChannelHandler}s.
* <p>
* It's only safe to {@linkplain EventLoop#register(Channel)} the {@link Channel} with another (or the same)
* {@link EventLoop} after the {@link ChannelPromise} has completed.
*/
ChannelFuture deregister(ChannelPromise promise);
@ -459,6 +484,11 @@ public interface Channel extends AttributeMap, Comparable<Channel> {
/**
* Register the {@link Channel} of the {@link ChannelPromise} and notify
* the {@link ChannelFuture} once the registration was complete.
* <p>
* It's only safe to submit a new task to the {@link EventLoop} from within a
* {@link ChannelHandler} once the {@link ChannelPromise} succeeded. Otherwise
* the task may or may not be rejected.
* </p>
*/
void register(EventLoop eventLoop, ChannelPromise promise);

View File

@ -19,6 +19,8 @@ import io.netty.channel.Channel.Unsafe;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.PausableEventExecutor;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -61,7 +63,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext tail;
private final Map<String, AbstractChannelHandlerContext> name2ctx =
new HashMap<String, AbstractChannelHandlerContext>(4);
new HashMap<String, AbstractChannelHandlerContext>(4);
/**
* @see #findInvoker(EventExecutorGroup)
@ -420,15 +422,15 @@ final class DefaultChannelPipeline implements ChannelPipeline {
remove0(ctx);
return ctx;
} else {
future = ctx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
remove0(ctx);
}
}
});
context = ctx;
future = ctx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
remove0(ctx);
}
}
});
context = ctx;
}
}
@ -620,7 +622,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
}
});
return;
}
@ -1188,8 +1190,16 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
public void deregister(ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
assert !((PausableEventExecutor) ctx.channel().eventLoop()).isAcceptingNewTasks();
// submit deregistration task
ctx.channel().eventLoop().unwrap().execute(new OneTimeTask() {
@Override
public void run() {
unsafe.deregister(promise);
}
});
}
@Override

View File

@ -36,10 +36,10 @@ public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {
/**
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
*/
public DefaultEventLoopGroup(int nEventLoops) {
this(nEventLoops, (Executor) null);
@ -47,10 +47,10 @@ public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {
/**
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executor the {@link Executor} to use, or {@code null} if the default should be used.
*/
public DefaultEventLoopGroup(int nEventLoops, Executor executor) {
@ -59,10 +59,10 @@ public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {
/**
* @param nEventLoops the number of {@link EventLoop}s that will be used by this instance.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* If {@code executor} is {@code null} this number will also be the parallelism
* requested from the default executor. It is generally advised for the number
* of {@link EventLoop}s and the number of {@link Thread}s used by the
* {@code executor} to lie very close together.
* @param executorFactory the {@link ExecutorFactory} to use, or {@code null} if the default should be used.
*/
public DefaultEventLoopGroup(int nEventLoops, ExecutorFactory executorFactory) {

View File

@ -29,6 +29,9 @@ public interface EventLoop extends EventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
@Override
EventLoop unwrap();
/**
* Creates a new default {@link ChannelHandlerInvoker} implementation that uses this {@link EventLoop} to
* invoke event handler methods.

View File

@ -27,14 +27,25 @@ public interface EventLoopGroup extends EventExecutorGroup {
EventLoop next();
/**
* Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
* will get notified once the registration was complete.
* Register a {@link Channel} with an {@link EventLoop} from this {@link EventLoopGroup}. The returned
* {@link ChannelFuture} will get notified once the registration is completed.
* <p>
* It's only safe to submit a new task to the {@link EventLoop} from within a
* {@link ChannelHandler} once the {@link ChannelPromise} succeeded. Otherwise
* the task may or may not be rejected.
* </p>
*/
ChannelFuture register(Channel channel);
/**
* Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
* will get notified once the registration was complete and also will get returned.
* Register a {@link Channel} with an {@link EventLoop} from this {@link EventLoopGroup}. The provided
* {@link ChannelPromise} will get notified once the registration is completed. The returned {@link ChannelFuture}
* is the same {@link ChannelPromise} that was passed to the method.
* <p>
* It's only safe to submit a new task to the {@link EventLoop} from within a
* {@link ChannelHandler} once the {@link ChannelPromise} succeeded. Otherwise
* the task may or may not be rejected.
* </p>
*/
ChannelFuture register(Channel channel, ChannelPromise promise);
}

View File

@ -0,0 +1,380 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.internal.CallableEventExecutorAdapter;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.PausableEventExecutor;
import io.netty.util.concurrent.ProgressivePromise;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.RunnableEventExecutorAdapter;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
abstract class PausableChannelEventExecutor implements PausableEventExecutor, ChannelHandlerInvoker {
abstract Channel channel();
abstract ChannelHandlerInvoker unwrapInvoker();
@Override
public void invokeFlush(ChannelHandlerContext ctx) {
unwrapInvoker().invokeFlush(ctx);
}
@Override
public EventExecutor executor() {
return this;
}
@Override
public void invokeChannelRegistered(ChannelHandlerContext ctx) {
unwrapInvoker().invokeChannelRegistered(ctx);
}
@Override
public void invokeChannelUnregistered(ChannelHandlerContext ctx) {
unwrapInvoker().invokeChannelUnregistered(ctx);
}
@Override
public void invokeChannelActive(ChannelHandlerContext ctx) {
unwrapInvoker().invokeChannelActive(ctx);
}
@Override
public void invokeChannelInactive(ChannelHandlerContext ctx) {
unwrapInvoker().invokeChannelInactive(ctx);
}
@Override
public void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
unwrapInvoker().invokeExceptionCaught(ctx, cause);
}
@Override
public void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event) {
unwrapInvoker().invokeUserEventTriggered(ctx, event);
}
@Override
public void invokeChannelRead(ChannelHandlerContext ctx, Object msg) {
unwrapInvoker().invokeChannelRead(ctx, msg);
}
@Override
public void invokeChannelReadComplete(ChannelHandlerContext ctx) {
unwrapInvoker().invokeChannelReadComplete(ctx);
}
@Override
public void invokeChannelWritabilityChanged(ChannelHandlerContext ctx) {
unwrapInvoker().invokeChannelWritabilityChanged(ctx);
}
@Override
public void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unwrapInvoker().invokeBind(ctx, localAddress, promise);
}
@Override
public void invokeConnect(
ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
unwrapInvoker().invokeConnect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
unwrapInvoker().invokeDisconnect(ctx, promise);
}
@Override
public void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise) {
unwrapInvoker().invokeClose(ctx, promise);
}
@Override
public void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise) {
unwrapInvoker().invokeDeregister(ctx, promise);
}
@Override
public void invokeRead(ChannelHandlerContext ctx) {
unwrapInvoker().invokeRead(ctx);
}
@Override
public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unwrapInvoker().invokeWrite(ctx, msg, promise);
}
@Override
public EventExecutor next() {
return unwrap().next();
}
@Override
public <E extends EventExecutor> Set<E> children() {
return unwrap().children();
}
@Override
public EventExecutorGroup parent() {
return unwrap().parent();
}
@Override
public boolean inEventLoop() {
return unwrap().inEventLoop();
}
@Override
public boolean inEventLoop(Thread thread) {
return unwrap().inEventLoop(thread);
}
@Override
public <V> Promise<V> newPromise() {
return unwrap().newPromise();
}
@Override
public <V> ProgressivePromise<V> newProgressivePromise() {
return unwrap().newProgressivePromise();
}
@Override
public <V> Future<V> newSucceededFuture(V result) {
return unwrap().newSucceededFuture(result);
}
@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
return unwrap().newFailedFuture(cause);
}
@Override
public boolean isShuttingDown() {
return unwrap().isShuttingDown();
}
@Override
public Future<?> shutdownGracefully() {
return unwrap().shutdownGracefully();
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return unwrap().shutdownGracefully(quietPeriod, timeout, unit);
}
@Override
public Future<?> terminationFuture() {
return unwrap().terminationFuture();
}
@Override
@Deprecated
public void shutdown() {
unwrap().terminationFuture();
}
@Override
@Deprecated
public List<Runnable> shutdownNow() {
return unwrap().shutdownNow();
}
@Override
public Future<?> submit(Runnable task) {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().submit(task);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().schedule(new ChannelRunnableEventExecutor(channel(), command), delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().schedule(new ChannelCallableEventExecutor<V>(channel(), callable), delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().scheduleAtFixedRate(
new ChannelRunnableEventExecutor(channel(), command), initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().scheduleWithFixedDelay(
new ChannelRunnableEventExecutor(channel(), command), initialDelay, delay, unit);
}
@Override
public boolean isShutdown() {
return unwrap().isShutdown();
}
@Override
public boolean isTerminated() {
return unwrap().isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return unwrap().awaitTermination(timeout, unit);
}
@Override
public <T> List<java.util.concurrent.Future<T>>
invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().invokeAll(tasks);
}
@Override
public <T> List<java.util.concurrent.Future<T>>
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
return unwrap().invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
if (!isAcceptingNewTasks()) {
throw new RejectedExecutionException();
}
unwrap().execute(command);
}
private static final class ChannelCallableEventExecutor<V> implements CallableEventExecutorAdapter<V> {
final Channel channel;
final Callable<V> callable;
ChannelCallableEventExecutor(Channel channel, Callable<V> callable) {
this.channel = channel;
this.callable = callable;
}
@Override
public EventExecutor executor() {
return channel.eventLoop();
}
@Override
public Callable unwrap() {
return callable;
}
@Override
public V call() throws Exception {
return callable.call();
}
}
private static final class ChannelRunnableEventExecutor implements RunnableEventExecutorAdapter {
final Channel channel;
final Runnable runnable;
ChannelRunnableEventExecutor(Channel channel, Runnable runnable) {
this.channel = channel;
this.runnable = runnable;
}
@Override
public EventExecutor executor() {
return channel.eventLoop();
}
@Override
public Runnable unwrap() {
return runnable;
}
@Override
public void run() {
runnable.run();
}
}
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.util.concurrent.Executor;
@ -70,6 +71,11 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
return !(task instanceof NonWakeupRunnable);
}
@Override
public EventLoop unwrap() {
return this;
}
/**
* Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases.
*/

View File

@ -190,7 +190,7 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle
}
@Override
public void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise) {
public void invokeDeregister(ChannelHandlerContext ctx, final ChannelPromise promise) {
invokeDeregisterNow(ctx, promise);
}

View File

@ -27,7 +27,6 @@ import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.InternalThreadLocalMap;
import java.net.SocketAddress;
@ -182,7 +181,7 @@ public class LocalChannel extends AbstractChannel {
}
});
}
((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
((SingleThreadEventLoop) eventLoop().unwrap()).addShutdownHook(shutdownHook);
}
@Override
@ -238,7 +237,7 @@ public class LocalChannel extends AbstractChannel {
@Override
protected void doDeregister() throws Exception {
// Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
((SingleThreadEventLoop) eventLoop().unwrap()).removeShutdownHook(shutdownHook);
}
@Override

View File

@ -22,7 +22,6 @@ import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.net.SocketAddress;
import java.util.ArrayDeque;
@ -83,7 +82,7 @@ public class LocalServerChannel extends AbstractServerChannel {
@Override
protected void doRegister() throws Exception {
((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
((SingleThreadEventLoop) eventLoop().unwrap()).addShutdownHook(shutdownHook);
}
@Override
@ -106,7 +105,7 @@ public class LocalServerChannel extends AbstractServerChannel {
@Override
protected void doDeregister() throws Exception {
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
((SingleThreadEventLoop) eventLoop().unwrap()).removeShutdownHook(shutdownHook);
}
@Override
@ -138,10 +137,10 @@ public class LocalServerChannel extends AbstractServerChannel {
serve0(child);
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
serve0(child);
}
@Override
public void run() {
serve0(child);
}
});
}
return child;

View File

@ -105,11 +105,6 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return ch;
}
@Override
public NioEventLoop eventLoop() {
return (NioEventLoop) super.eventLoop();
}
/**
* Return the current {@link SelectionKey}
*/
@ -337,13 +332,13 @@ public abstract class AbstractNioChannel extends AbstractChannel {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
((NioEventLoop) eventLoop().unwrap()).selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
@ -356,7 +351,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
((NioEventLoop) eventLoop().unwrap()).cancel(selectionKey());
}
@Override

View File

@ -18,13 +18,13 @@ package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ExecutorFactory;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.Executor;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.ExecutorFactory;
/**
* A {@link MultithreadEventLoopGroup} implementation which is used for NIO {@link Selector} based {@link Channel}s.

View File

@ -1,72 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.Test;
import static org.junit.Assert.*;
public abstract class AbstractEventLoopTest {
/**
* Test for https://github.com/netty/netty/issues/803
*/
@Test
public void testReregister() {
EventLoopGroup group = newEventLoopGroup();
EventLoopGroup group2 = newEventLoopGroup();
final EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(2);
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelFuture future = bootstrap.channel(newChannel()).group(group)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
}).handler(new ChannelInitializer<ServerSocketChannel>() {
@Override
public void initChannel(ServerSocketChannel ch) throws Exception {
ch.pipeline().addLast(new TestChannelHandler());
ch.pipeline().addLast(eventExecutorGroup, new TestChannelHandler2());
}
})
.bind(0).awaitUninterruptibly();
EventExecutor executor = future.channel().pipeline().context(TestChannelHandler2.class).executor();
EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler.class).executor();
future.channel().deregister().awaitUninterruptibly();
Channel channel = group2.register(future.channel()).awaitUninterruptibly().channel();
EventExecutor executorNew = channel.pipeline().context(TestChannelHandler.class).executor();
assertNotSame(executor1, executorNew);
assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor());
}
private static final class TestChannelHandler extends ChannelHandlerAdapter { }
private static final class TestChannelHandler2 extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { }
}
protected abstract EventLoopGroup newEventLoopGroup();
protected abstract Class<? extends ServerSocketChannel> newChannel();
}

View File

@ -0,0 +1,389 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.PausableEventExecutor;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.*;
/**
* These tests should work with any {@link SingleThreadEventLoop} implementation. We chose the
* {@link io.netty.channel.nio.NioEventLoop} because it's the most commonly used {@link EventLoop}.
*/
public class ChannelDeregistrationTest {
private static final Runnable NOEXEC = new Runnable() {
@Override
public void run() {
fail();
}
};
private static final Runnable NOOP = new Runnable() {
@Override
public void run() {
}
};
/**
* Test deregistration and re-registration of a {@link Channel} within a {@link ChannelHandler}.
*
* The first {@link ChannelHandler} in the {@link ChannelPipeline} deregisters the {@link Channel} in the
* {@linkplain ChannelHandler#channelRead(ChannelHandlerContext, Object)} method, while
* the subsequent {@link ChannelHandler}s make sure that the {@link Channel} really is deregistered. The last
* {@link ChannelHandler} registers the {@link Channel} with a new {@link EventLoop} and triggers a
* {@linkplain ChannelHandler#write(ChannelHandlerContext, Object, ChannelPromise)} event, that is then
* used by all {@link ChannelHandler}s to ensure that the {@link Channel} was correctly registered with the
* new {@link EventLoop}.
*
* Most of the {@link ChannelHandler}s in the pipeline are assigned custom {@link EventExecutorGroup}s.
* It's important to make sure that they are preserved during and after
* {@linkplain io.netty.channel.Channel#deregister()}.
*/
@Test(timeout = 2000)
public void testDeregisterFromDifferentEventExecutorGroup() throws Exception {
final AtomicBoolean handlerExecuted1 = new AtomicBoolean();
final AtomicBoolean handlerExecuted2 = new AtomicBoolean();
final AtomicBoolean handlerExecuted3 = new AtomicBoolean();
final AtomicBoolean handlerExecuted4 = new AtomicBoolean();
final AtomicBoolean handlerExecuted5 = new AtomicBoolean();
final EventLoopGroup group1 = new NioEventLoopGroup(1);
final EventLoopGroup group2 = new NioEventLoopGroup(1);
final EventLoopGroup group3 = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
Channel serverChannel = serverBootstrap.group(group1)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// Deregister the Channel from the EventLoop group1.
ch.pipeline().addLast(new DeregisterHandler(handlerExecuted1, group1.next(), group2.next()));
// Ensure that the Channel is deregistered from EventLoop group1. Also make
// sure that despite deregistration the ChannelHandler is executed by the
// specified EventLoop.
ch.pipeline().addLast(group2, new ExpectDeregisteredHandler(handlerExecuted2, group2.next()));
ch.pipeline().addLast(group3, new ExpectDeregisteredHandler(handlerExecuted3, group3.next()));
ch.pipeline().addLast(group2, new ExpectDeregisteredHandler(handlerExecuted4, group2.next()));
// Register the Channel with EventLoop group2.
ch.pipeline().addLast(group3,
new ReregisterHandler(handlerExecuted5, group3.next(), group2.next()));
}
}).bind(0).sync().channel();
SocketAddress address = serverChannel.localAddress();
Socket s = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) address).getPort());
// write garbage just so to get channelRead(...) invoked.
s.getOutputStream().write(1);
while (!(handlerExecuted1.get() &&
handlerExecuted2.get() &&
handlerExecuted3.get() &&
handlerExecuted4.get() &&
handlerExecuted5.get())) {
Thread.sleep(10);
}
s.close();
serverChannel.close();
group1.shutdownGracefully();
group2.shutdownGracefully();
group3.shutdownGracefully();
}
/**
* Make sure the {@link EventLoop} and {@link ChannelHandlerInvoker} accessible from within a
* {@link ChannelHandler} are wrapped by a {@link PausableEventExecutor}.
*/
@Test(timeout = 2000)
public void testWrappedEventLoop() throws Exception {
final AtomicBoolean channelActiveCalled1 = new AtomicBoolean();
final AtomicBoolean channelActiveCalled2 = new AtomicBoolean();
final EventLoopGroup group1 = new NioEventLoopGroup();
final EventLoopGroup group2 = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
Channel serverChannel = serverBootstrap.group(group1)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TestChannelHandler3(channelActiveCalled1));
ch.pipeline().addLast(group2, new TestChannelHandler4(channelActiveCalled2));
}
}).bind(0).sync().channel();
SocketAddress address = serverChannel.localAddress();
Socket client = new Socket(NetUtil.LOCALHOST, ((InetSocketAddress) address).getPort());
while (!(channelActiveCalled1.get() && channelActiveCalled2.get())) {
Thread.sleep(10);
}
client.close();
serverChannel.close();
group1.shutdownGracefully();
group2.shutdownGracefully();
}
/**
* Test for https://github.com/netty/netty/issues/803
*/
@Test
public void testReregister() throws Exception {
final EventLoopGroup group1 = new NioEventLoopGroup();
final EventLoopGroup group2 = new NioEventLoopGroup();
final EventExecutorGroup group3 = new DefaultEventExecutorGroup(2);
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelFuture future = bootstrap.channel(NioServerSocketChannel.class).group(group1)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
}).handler(new ChannelInitializer<ServerSocketChannel>() {
@Override
public void initChannel(ServerSocketChannel ch) throws Exception {
ch.pipeline().addLast(new TestChannelHandler1());
ch.pipeline().addLast(group3, new TestChannelHandler2());
}
})
.bind(0).awaitUninterruptibly();
EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler1.class).executor();
EventExecutor unwrapped1 = executor1.unwrap();
EventExecutor executor3 = future.channel().pipeline().context(TestChannelHandler2.class).executor();
future.channel().deregister().sync();
Channel channel = group2.register(future.channel()).sync().channel();
EventExecutor executor2 = channel.pipeline().context(TestChannelHandler1.class).executor();
// same wrapped executor
assertSame(executor1, executor2);
// different executor under the wrapper
assertNotSame(unwrapped1, executor2.unwrap());
// executor3 must remain unchanged
assertSame(executor3.unwrap(), future.channel().pipeline()
.context(TestChannelHandler2.class)
.executor()
.unwrap());
}
private static final class TestChannelHandler1 extends ChannelHandlerAdapter { }
private static final class TestChannelHandler2 extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { }
}
private static final class TestChannelHandler3 extends ChannelHandlerAdapter {
AtomicBoolean channelActiveCalled;
TestChannelHandler3(AtomicBoolean channelActiveCalled) {
this.channelActiveCalled = channelActiveCalled;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channelActiveCalled.set(true);
assertTrue(ctx.executor() instanceof PausableEventExecutor);
assertTrue(ctx.channel().eventLoop() instanceof PausableEventExecutor);
assertTrue(ctx.invoker().executor() instanceof PausableEventExecutor);
assertTrue(ctx.channel().eventLoop().asInvoker().executor() instanceof PausableEventExecutor);
assertSame(ctx.executor().unwrap(), ctx.channel().eventLoop().unwrap());
super.channelActive(ctx);
}
}
private static final class TestChannelHandler4 extends ChannelHandlerAdapter {
AtomicBoolean channelActiveCalled;
TestChannelHandler4(AtomicBoolean channelActiveCalled) {
this.channelActiveCalled = channelActiveCalled;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channelActiveCalled.set(true);
assertTrue(ctx.executor() instanceof PausableEventExecutor);
assertTrue(ctx.channel().eventLoop() instanceof PausableEventExecutor);
assertTrue(ctx.invoker().executor() instanceof PausableEventExecutor);
assertTrue(ctx.channel().eventLoop().asInvoker().executor() instanceof PausableEventExecutor);
// This is executed by its own invoker, which has to be wrapped by
// a separate PausableEventExecutor.
assertNotSame(ctx.executor(), ctx.channel().eventLoop());
assertNotSame(ctx.executor().unwrap(), ctx.channel().eventLoop().unwrap());
super.channelActive(ctx);
}
}
private static final class DeregisterHandler extends ChannelHandlerAdapter {
final AtomicBoolean handlerExecuted;
final EventLoop expectedEventLoop1;
final EventLoop expectedEventLoop2;
DeregisterHandler(AtomicBoolean handlerExecuted, EventLoop expectedEventLoop1, EventLoop expectedEventLoop2) {
this.handlerExecuted = handlerExecuted;
this.expectedEventLoop1 = expectedEventLoop1;
this.expectedEventLoop2 = expectedEventLoop2;
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
assertSame(expectedEventLoop1, ctx.executor().unwrap());
assertTrue(ctx.channel().isRegistered());
ctx.channel().deregister().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assertFalse(ctx.channel().isRegistered());
boolean success = false;
try {
ctx.channel().eventLoop().execute(NOEXEC);
success = true;
} catch (Throwable t) {
assertTrue(t instanceof RejectedExecutionException);
}
assertFalse(success);
handlerExecuted.set(true);
ctx.fireChannelRead(msg);
}
});
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
assertSame(expectedEventLoop2, ctx.executor().unwrap());
assertTrue(ctx.channel().isRegistered());
ctx.executor().execute(NOOP);
ctx.channel().eventLoop().execute(NOOP);
promise.setSuccess();
}
}
private static final class ExpectDeregisteredHandler extends ChannelHandlerAdapter {
final AtomicBoolean handlerExecuted;
final EventLoop expectedEventLoop;
ExpectDeregisteredHandler(AtomicBoolean handlerExecuted, EventLoop expectedEventLoop) {
this.handlerExecuted = handlerExecuted;
this.expectedEventLoop = expectedEventLoop;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assertSame(expectedEventLoop, ctx.executor().unwrap());
assertFalse(ctx.channel().isRegistered());
boolean success = false;
try {
ctx.channel().eventLoop().execute(NOEXEC);
success = true;
} catch (Throwable t) {
assertTrue(t instanceof RejectedExecutionException);
}
assertFalse(success);
handlerExecuted.set(true);
super.channelRead(ctx, msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
assertSame(expectedEventLoop, ctx.executor().unwrap());
assertTrue(ctx.channel().isRegistered());
ctx.executor().execute(NOOP);
ctx.channel().eventLoop().execute(NOOP);
super.write(ctx, msg, promise);
}
}
private static final class ReregisterHandler extends ChannelHandlerAdapter {
final AtomicBoolean handlerExecuted;
final EventLoop expectedEventLoop;
final EventLoop newEventLoop;
ReregisterHandler(AtomicBoolean handlerExecuted, EventLoop expectedEventLoop, EventLoop newEventLoop) {
this.handlerExecuted = handlerExecuted;
this.expectedEventLoop = expectedEventLoop;
this.newEventLoop = newEventLoop;
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
assertSame(expectedEventLoop, ctx.executor().unwrap());
assertFalse(ctx.channel().isRegistered());
newEventLoop.register(ctx.channel()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assertTrue(ctx.channel().isRegistered());
ctx.executor().execute(NOOP);
ctx.channel().eventLoop().execute(NOOP);
ctx.write(Unpooled.buffer(), ctx.channel().newPromise()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
handlerExecuted.set(true);
}
});
}
});
super.channelRead(ctx, msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
assertSame(expectedEventLoop, ctx.executor().unwrap());
assertTrue(ctx.channel().isRegistered());
ctx.executor().execute(NOOP);
ctx.channel().eventLoop().execute(NOOP);
super.write(ctx, msg, promise);
}
}
}

View File

@ -21,6 +21,7 @@ import ch.qos.logback.core.Appender;
import io.netty.channel.local.LocalChannel;
import io.netty.util.concurrent.DefaultExecutorFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PausableEventExecutor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -32,10 +33,12 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -49,7 +52,9 @@ public class SingleThreadEventLoopTest {
public void run() { }
};
@SuppressWarnings({ "unused" })
private SingleThreadEventLoopA loopA;
@SuppressWarnings({ "unused" })
private SingleThreadEventLoopB loopB;
@Before
@ -176,17 +181,8 @@ public class SingleThreadEventLoopTest {
assertTrue(f.cancel(true));
assertEquals(5, timestamps.size());
// Check if the task was run without a lag.
Long previousTimestamp = null;
for (Long t: timestamps) {
if (previousTimestamp == null) {
previousTimestamp = t;
continue;
}
assertTrue(t.longValue() - previousTimestamp.longValue() >= TimeUnit.MILLISECONDS.toNanos(90));
previousTimestamp = t;
}
// Check if the task was run without lag.
verifyTimestampDeltas(timestamps, 90);
}
@Test
@ -266,17 +262,8 @@ public class SingleThreadEventLoopTest {
assertTrue(f.cancel(true));
assertEquals(3, timestamps.size());
// Check if the task was run without a lag.
Long previousTimestamp = null;
for (Long t: timestamps) {
if (previousTimestamp == null) {
previousTimestamp = t;
continue;
}
assertTrue(t.longValue() - previousTimestamp.longValue() >= TimeUnit.MILLISECONDS.toNanos(150));
previousTimestamp = t;
}
// Check if the task was run without lag.
verifyTimestampDeltas(timestamps, TimeUnit.MILLISECONDS.toNanos(150));
}
@Test
@ -435,12 +422,276 @@ public class SingleThreadEventLoopTest {
assertThat(loopA.isShutdown(), is(true));
}
@Test(timeout = 10000)
public void testScheduledTaskWakeupAfterDeregistration() throws Exception {
int numtasks = 5;
final List<Queue<Long>> timestampsPerTask = new ArrayList<Queue<Long>>(numtasks);
final List<ScheduledFuture> scheduledFutures = new ArrayList<ScheduledFuture>(numtasks);
// start the eventloops
loopA.execute(NOOP);
loopB.execute(NOOP);
LocalChannel channel = new LocalChannel();
ChannelPromise registerPromise = channel.newPromise();
channel.unsafe().register(loopA, registerPromise);
registerPromise.sync();
for (int i = 0; i < numtasks; i++) {
Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
timestampsPerTask.add(timestamps);
scheduledFutures.add(channel.eventLoop()
.scheduleAtFixedRate(new TimestampsRunnable(timestamps), 0, 100, TimeUnit.MILLISECONDS));
}
// Each task should be executed every 100ms.
// Will give them additional 50ms to execute ten times.
Thread.sleep(50 + 10 * 100);
// Deregister must stop future execution of scheduled tasks.
assertTrue(channel.deregister().sync().isSuccess());
for (Queue<Long> timestamps : timestampsPerTask) {
assertTrue(timestamps.size() >= 10);
verifyTimestampDeltas(timestamps, TimeUnit.MICROSECONDS.toNanos(90));
timestamps.clear();
}
// Because the Channel is deregistered no task must have executed since then.
Thread.sleep(200);
for (Queue<Long> timestamps : timestampsPerTask) {
assertTrue(timestamps.isEmpty());
}
registerPromise = channel.newPromise();
channel.unsafe().register(loopB, registerPromise);
registerPromise.sync();
// After the channel was registered with another eventloop the scheduled tasks should start executing again.
// Same as above.
Thread.sleep(50 + 10 * 100);
// Cancel all scheduled tasks.
for (ScheduledFuture f : scheduledFutures) {
assertTrue(f.cancel(true));
}
for (Queue<Long> timestamps : timestampsPerTask) {
assertTrue(timestamps.size() >= 10);
verifyTimestampDeltas(timestamps, TimeUnit.MICROSECONDS.toNanos(90));
}
}
/**
* Runnable that adds the current nano time to a list whenever
* it's executed.
*/
private static class TimestampsRunnable implements Runnable {
private Queue<Long> timestamps;
TimestampsRunnable(Queue<Long> timestamps) {
assertNotNull(timestamps);
this.timestamps = timestamps;
}
@Override
public void run() {
timestamps.add(System.nanoTime());
}
}
@Test(timeout = 10000)
public void testDeregisterRegisterMultipleTimesWithTasks() throws Exception {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
// need a final counter, so it can be used in an inner class.
final AtomicInteger i = new AtomicInteger(-1);
// start the eventloops
loopA.execute(NOOP);
loopB.execute(NOOP);
LocalChannel channel = new LocalChannel();
boolean firstRun = true;
ScheduledFuture f = null;
while (i.incrementAndGet() < 4) {
ChannelPromise registerPromise = channel.newPromise();
channel.unsafe().register(i.intValue() % 2 == 0 ? loopA : loopB, registerPromise);
registerPromise.sync();
if (firstRun) {
f = channel.eventLoop().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
assertTrue((i.intValue() % 2 == 0 ? loopA : loopB).inEventLoop());
timestamps.add(System.nanoTime());
}
}, 0, 100, TimeUnit.MILLISECONDS);
firstRun = false;
}
Thread.sleep(250);
assertTrue(channel.deregister().sync().isSuccess());
assertTrue("was " + timestamps.size(), timestamps.size() >= 2);
verifyTimestampDeltas(timestamps, TimeUnit.MILLISECONDS.toNanos(90));
timestamps.clear();
}
// cancel while the channel is deregistered
assertFalse(channel.isRegistered());
assertTrue(f.cancel(true));
assertTrue(timestamps.isEmpty());
// register again and check that it's not executed again.
ChannelPromise registerPromise = channel.newPromise();
channel.unsafe().register(loopA, registerPromise);
registerPromise.sync();
Thread.sleep(200);
assertTrue(timestamps.isEmpty());
}
@Test(timeout = 10000)
public void testDeregisterWithScheduleWithFixedDelayTask() throws Exception {
testDeregisterWithPeriodicScheduleTask(PeriodicScheduleMethod.FIXED_DELAY);
}
@Test(timeout = 10000)
public void testDeregisterWithScheduleAtFixedRateTask() throws Exception {
testDeregisterWithPeriodicScheduleTask(PeriodicScheduleMethod.FIXED_RATE);
}
private void testDeregisterWithPeriodicScheduleTask(PeriodicScheduleMethod method) throws Exception {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
// start the eventloops
loopA.execute(NOOP);
loopB.execute(NOOP);
LocalChannel channel = new LocalChannel();
ChannelPromise registerPromise = channel.newPromise();
channel.unsafe().register(loopA, registerPromise);
registerPromise.sync();
assertThat(channel.eventLoop(), instanceOf(PausableEventExecutor.class));
assertSame(loopA, channel.eventLoop().unwrap());
ScheduledFuture scheduleFuture;
if (PeriodicScheduleMethod.FIXED_RATE.equals(method)) {
scheduleFuture = channel.eventLoop().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
assertTrue(loopB.inEventLoop());
timestamps.add(System.nanoTime());
}
}, 100, 200, TimeUnit.MILLISECONDS);
} else {
scheduleFuture = channel.eventLoop().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
assertTrue(loopB.inEventLoop());
timestamps.add(System.nanoTime());
}
}, 100, 200, TimeUnit.MILLISECONDS);
}
assertTrue(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks());
ChannelFuture deregisterFuture = channel.deregister();
assertFalse(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks());
assertTrue(deregisterFuture.sync().isSuccess());
timestamps.clear();
Thread.sleep(1000);
// no scheduled tasks must be executed after deregistration.
assertTrue("size: " + timestamps.size(), timestamps.isEmpty());
assertFalse(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks());
registerPromise = channel.newPromise();
channel.unsafe().register(loopB, registerPromise);
assertTrue(registerPromise.sync().isSuccess());
assertTrue(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks());
assertThat(channel.eventLoop(), instanceOf(PausableEventExecutor.class));
assertSame(loopB, channel.eventLoop().unwrap());
// 100ms internal delay + 1 second. Should be able to execute 5 tasks in that time.
Thread.sleep(1150);
assertTrue(scheduleFuture.cancel(true));
assertTrue("was " + timestamps.size(), timestamps.size() >= 5);
verifyTimestampDeltas(timestamps, TimeUnit.MILLISECONDS.toNanos(190));
}
private static enum PeriodicScheduleMethod {
FIXED_RATE, FIXED_DELAY
}
private static void verifyTimestampDeltas(Queue<Long> timestamps, long minDelta) {
assertFalse(timestamps.isEmpty());
long prev = timestamps.poll();
for (Long timestamp : timestamps) {
long delta = timestamp - prev;
assertTrue(String.format("delta: %d, minDelta: %d", delta, minDelta), delta >= minDelta);
prev = timestamp;
}
}
@Test(timeout = 10000)
public void testDeregisterWithScheduledTask() throws Exception {
final AtomicBoolean oneTimeScheduledTaskExecuted = new AtomicBoolean(false);
// start the eventloops
loopA.execute(NOOP);
loopB.execute(NOOP);
LocalChannel channel = new LocalChannel();
ChannelPromise registerPromise = channel.newPromise();
channel.unsafe().register(loopA, registerPromise);
registerPromise.sync();
assertThat(channel.eventLoop(), instanceOf(PausableEventExecutor.class));
assertSame(loopA, ((PausableEventExecutor) channel.eventLoop()).unwrap());
io.netty.util.concurrent.ScheduledFuture scheduleFuture = channel.eventLoop().schedule(new Runnable() {
@Override
public void run() {
oneTimeScheduledTaskExecuted.set(true);
assertTrue(loopB.inEventLoop());
}
}, 1, TimeUnit.SECONDS);
assertTrue(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks());
ChannelFuture deregisterFuture = channel.deregister();
assertFalse(((PausableEventExecutor) channel.eventLoop()).isAcceptingNewTasks());
assertTrue(deregisterFuture.sync().isSuccess());
Thread.sleep(1000);
registerPromise = channel.newPromise();
assertFalse(oneTimeScheduledTaskExecuted.get());
channel.unsafe().register(loopB, registerPromise);
registerPromise.sync();
assertThat(channel.eventLoop(), instanceOf(PausableEventExecutor.class));
assertSame(loopB, ((PausableEventExecutor) channel.eventLoop()).unwrap());
assertTrue(scheduleFuture.sync().isSuccess());
assertTrue(oneTimeScheduledTaskExecuted.get());
}
private static class SingleThreadEventLoopA extends SingleThreadEventLoop {
final AtomicInteger cleanedUp = new AtomicInteger();
SingleThreadEventLoopA() {
super(null, new DefaultExecutorFactory("A").newExecutor(1), true);
super(null, Executors.newSingleThreadExecutor(), true);
}
@Override
@ -470,7 +721,7 @@ public class SingleThreadEventLoopTest {
private volatile boolean interrupted;
SingleThreadEventLoopB() {
super(null, new DefaultExecutorFactory("B").newExecutor(1), false);
super(null, Executors.newSingleThreadExecutor(), false);
}
@Override

View File

@ -55,7 +55,7 @@ public class LocalTransportThreadModelTest2 {
serverBootstrap.bind(new LocalAddress(LOCAL_CHANNEL)).sync();
int count = 100;
for (int i = 1; i < count + 1; i ++) {
for (int i = 1; i < count + 1; i++) {
Channel ch = clientBootstrap.connect().sync().channel();
// SPIN until we get what we are looking for.