[#5486] Not operate on serial execution assumption when using EventExecutor in the DefaultChannelPipeline.

Motivation:

In commit f984870ccca133d6056e8b0df0b2352f8f90b0fe I made a change which operated under invalide assumption that tasks executed by an EventExecutor will always be processed in a serial fashion. This is true for SingleThreadEventExecutor sub-classes but not part of the EventExecutor interface contract.

Because of this change implementations of EventExecutor which not strictly execute tasks in a serial fashion may miss events before handlerAdded(...) is called. This is strictly speaking not correct as there is not guarantee in this case that handlerAdded(...) will be called as first task (as there is no ordering guarentee).

Cassandra itself ships such an EventExecutor implementation which has no strict ordering to spread load across multiple threads.

Modifications:

- Add new OrderedEventExecutor interface and let SingleThreadEventExecutor / EventLoop implement / extend it.
- Only expose "restriction" of skipping events until handlerAdded(...) is called for OrderedEventExecutor implementations
- Add ThreadPoolEventExecutor implementation which executes tasks in an unordered fashion. This is used in added unit test but can also be used for protocols which not expose an strict ordering.
- Add unit test.

Result:

Resurrect the possibility to implement an EventExecutor which does not enforce serial execution of events and be able to use it with the DefaultChannelPipeline.
This commit is contained in:
Norman Maurer 2016-07-03 22:28:47 +02:00
parent 6492cb98b2
commit 29fdb160f3
7 changed files with 388 additions and 41 deletions

View File

@ -0,0 +1,22 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
/**
* Marker interface for {@link EventExecutor}s that will process all submitted tasks in an ordered / serial fashion.
*/
public interface OrderedEventExecutor extends EventExecutor {
}

View File

@ -38,10 +38,10 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/** /**
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread. * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
* *
*/ */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor { public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16, static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE)); SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));

View File

@ -0,0 +1,251 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* {@link EventExecutor} implementation which makes no guarantees about the ordering of task execution that
* are submitted because there may be multiple threads executing these tasks.
* This implementation is most useful for protocols that do not need strict ordering.
*
* <strong>Because it provides no ordering care should be taken when using it!</strong>
*/
public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolExecutor implements EventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(
UnorderedThreadPoolEventExecutor.class);
private final Promise<?> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
private final Set<EventExecutor> executorSet = Collections.singleton((EventExecutor) this);
/**
* Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int, ThreadFactory)}
* using {@link DefaultThreadFactory}.
*/
public UnorderedThreadPoolEventExecutor(int corePoolSize) {
this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class));
}
/**
* See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory)}
*/
public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
/**
* Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int,
* ThreadFactory, java.util.concurrent.RejectedExecutionHandler)} using {@link DefaultThreadFactory}.
*/
public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) {
this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class), handler);
}
/**
* See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory, RejectedExecutionHandler)}
*/
public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, threadFactory, handler);
}
@Override
public EventExecutor next() {
return this;
}
@Override
public EventExecutorGroup parent() {
return this;
}
@Override
public boolean inEventLoop() {
return false;
}
@Override
public boolean inEventLoop(Thread thread) {
return false;
}
@Override
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}
@Override
public <V> ProgressivePromise<V> newProgressivePromise() {
return new DefaultProgressivePromise<V>(this);
}
@Override
public <V> Future<V> newSucceededFuture(V result) {
return new SucceededFuture<V>(this, result);
}
@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
return new FailedFuture<V>(this, cause);
}
@Override
public boolean isShuttingDown() {
return isShutdown();
}
@Override
public List<Runnable> shutdownNow() {
List<Runnable> tasks = super.shutdownNow();
terminationFuture.trySuccess(null);
return tasks;
}
@Override
public void shutdown() {
super.shutdown();
terminationFuture.trySuccess(null);
}
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(2, 15, TimeUnit.SECONDS);
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
// TODO: At the moment this just calls shutdown but we may be able to do something more smart here which
// respects the quietPeriod and timeout.
shutdown();
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override
public Iterator<EventExecutor> iterator() {
return executorSet.iterator();
}
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return new RunnableScheduledFutureTask<V>(this, runnable, task);
}
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return new RunnableScheduledFutureTask<V>(this, callable, task);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return (ScheduledFuture<?>) super.schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return (ScheduledFuture<V>) super.schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return (ScheduledFuture<?>) super.scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return (ScheduledFuture<?>) super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
@Override
public Future<?> submit(Runnable task) {
return (Future<?>) super.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return (Future<T>) super.submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return (Future<T>) super.submit(task);
}
private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V>
implements RunnableScheduledFuture<V>, ScheduledFuture<V> {
private final RunnableScheduledFuture<V> future;
RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable,
RunnableScheduledFuture<V> future) {
super(executor, runnable, null);
this.future = future;
}
RunnableScheduledFutureTask(EventExecutor executor, Callable<V> callable,
RunnableScheduledFuture<V> future) {
super(executor, callable);
this.future = future;
}
@Override
public void run() {
if (!isPeriodic()) {
super.run();
} else if (!isDone()) {
try {
// Its a periodic task so we need to ignore the return value
task.call();
} catch (Throwable cause) {
if (!tryFailureInternal(cause)) {
logger.warn("Failure during execution of task", cause);
}
}
}
}
@Override
public boolean isPeriodic() {
return future.isPeriodic();
}
@Override
public long getDelay(TimeUnit unit) {
return future.getDelay(unit);
}
@Override
public int compareTo(Delayed o) {
return future.compareTo(o);
}
}
}

View File

@ -23,6 +23,8 @@ import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint; import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.OrderedEventExecutor;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
@ -31,6 +33,7 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint { implements ChannelHandlerContext, ResourceLeakHint {
@ -39,14 +42,30 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev; volatile AbstractChannelHandlerContext prev;
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER;
static {
AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> handlerStateUpdater = PlatformDependent
.newAtomicIntegerFieldUpdater(AbstractChannelHandlerContext.class, "handlerState");
if (handlerStateUpdater == null) {
handlerStateUpdater = AtomicIntegerFieldUpdater
.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
}
HANDLER_STATE_UPDATER = handlerStateUpdater;
}
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
*/
private static final int ADD_PENDING = 1;
/** /**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/ */
private static final int ADDED = 1; private static final int ADD_COMPLETE = 2;
/** /**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/ */
private static final int REMOVED = 2; private static final int REMOVE_COMPLETE = 3;
/** /**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
@ -57,12 +76,12 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
private final boolean outbound; private final boolean outbound;
private final DefaultChannelPipeline pipeline; private final DefaultChannelPipeline pipeline;
private final String name; private final String name;
private final boolean ordered;
// Will be set to null if no child executor should be used, otherwise it will be set to the // Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor. // child executor.
final EventExecutor executor; final EventExecutor executor;
private ChannelFuture succeededFuture; private ChannelFuture succeededFuture;
private int handlerState = INIT;
// Lazily instantiated tasks used to trigger events to a handler with different executor. // Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed. // There is no need to make this volatile as at worse it will just create a few more instances then needed.
@ -71,18 +90,17 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
private Runnable invokeChannelWritableStateChangedTask; private Runnable invokeChannelWritableStateChangedTask;
private Runnable invokeFlushTask; private Runnable invokeFlushTask;
private volatile int handlerState = INIT;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) { boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
if (name == null) {
throw new NullPointerException("name");
}
this.pipeline = pipeline; this.pipeline = pipeline;
this.name = name;
this.executor = executor; this.executor = executor;
this.inbound = inbound; this.inbound = inbound;
this.outbound = outbound; this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
} }
@Override @Override
@ -135,7 +153,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeChannelRegistered() { private void invokeChannelRegistered() {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelInboundHandler) handler()).channelRegistered(this); ((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -167,7 +185,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeChannelUnregistered() { private void invokeChannelUnregistered() {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelInboundHandler) handler()).channelUnregistered(this); ((ChannelInboundHandler) handler()).channelUnregistered(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -200,7 +218,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeChannelActive() { private void invokeChannelActive() {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelInboundHandler) handler()).channelActive(this); ((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -232,7 +250,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeChannelInactive() { private void invokeChannelInactive() {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelInboundHandler) handler()).channelInactive(this); ((ChannelInboundHandler) handler()).channelInactive(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -272,7 +290,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeExceptionCaught(final Throwable cause) { private void invokeExceptionCaught(final Throwable cause) {
if (isAdded()) { if (invokeHandler()) {
try { try {
handler().exceptionCaught(this, cause); handler().exceptionCaught(this, cause);
} catch (Throwable error) { } catch (Throwable error) {
@ -316,7 +334,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeUserEventTriggered(Object event) { private void invokeUserEventTriggered(Object event) {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelInboundHandler) handler()).userEventTriggered(this, event); ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
} catch (Throwable t) { } catch (Throwable t) {
@ -349,7 +367,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeChannelRead(Object msg) { private void invokeChannelRead(Object msg) {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelInboundHandler) handler()).channelRead(this, msg); ((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) { } catch (Throwable t) {
@ -385,7 +403,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeChannelReadComplete() { private void invokeChannelReadComplete() {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelInboundHandler) handler()).channelReadComplete(this); ((ChannelInboundHandler) handler()).channelReadComplete(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -421,7 +439,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeChannelWritabilityChanged() { private void invokeChannelWritabilityChanged() {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelInboundHandler) handler()).channelWritabilityChanged(this); ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -488,7 +506,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) { } catch (Throwable t) {
@ -532,7 +550,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) { } catch (Throwable t) {
@ -576,7 +594,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeDisconnect(ChannelPromise promise) { private void invokeDisconnect(ChannelPromise promise) {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelOutboundHandler) handler()).disconnect(this, promise); ((ChannelOutboundHandler) handler()).disconnect(this, promise);
} catch (Throwable t) { } catch (Throwable t) {
@ -611,7 +629,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeClose(ChannelPromise promise) { private void invokeClose(ChannelPromise promise) {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelOutboundHandler) handler()).close(this, promise); ((ChannelOutboundHandler) handler()).close(this, promise);
} catch (Throwable t) { } catch (Throwable t) {
@ -646,7 +664,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeDeregister(ChannelPromise promise) { private void invokeDeregister(ChannelPromise promise) {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelOutboundHandler) handler()).deregister(this, promise); ((ChannelOutboundHandler) handler()).deregister(this, promise);
} catch (Throwable t) { } catch (Throwable t) {
@ -680,7 +698,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeRead() { private void invokeRead() {
if (isAdded()) { if (invokeHandler()) {
try { try {
((ChannelOutboundHandler) handler()).read(this); ((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -718,7 +736,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeWrite(Object msg, ChannelPromise promise) { private void invokeWrite(Object msg, ChannelPromise promise) {
if (isAdded()) { if (invokeHandler()) {
invokeWrite0(msg, promise); invokeWrite0(msg, promise);
} else { } else {
write(msg, promise); write(msg, promise);
@ -756,7 +774,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeFlush() { private void invokeFlush() {
if (isAdded()) { if (invokeHandler()) {
invokeFlush0(); invokeFlush0();
} else { } else {
flush(); flush();
@ -789,7 +807,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (isAdded()) { if (invokeHandler()) {
invokeWrite0(msg, promise); invokeWrite0(msg, promise);
invokeFlush0(); invokeFlush0();
} else { } else {
@ -947,11 +965,24 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
} }
final void setRemoved() { final void setRemoved() {
handlerState = REMOVED; handlerState = REMOVE_COMPLETE;
} }
final void setAdded() { final void setAddComplete() {
handlerState = ADDED; for (;;) {
int oldState = handlerState;
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
final void setAddPending() {
boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
} }
/** /**
@ -960,15 +991,17 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
* *
* If this method returns {@code true} we will not invoke the {@link ChannelHandler} but just forward the event. * If this method returns {@code true} we will not invoke the {@link ChannelHandler} but just forward the event.
* This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
* but not called {@link } * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
*/ */
private boolean isAdded() { private boolean invokeHandler() {
return handlerState == ADDED; // Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
} }
@Override @Override
public boolean isRemoved() { public boolean isRemoved() {
return handlerState == REMOVED; return handlerState == REMOVE_COMPLETE;
} }
@Override @Override

View File

@ -154,12 +154,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
// In this case we add the context to the pipeline and add a task that will call // In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered. // ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) { if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(newCtx, true);
return this; return this;
} }
EventExecutor executor = newCtx.executor(); EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) { if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -200,12 +202,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
// In this case we add the context to the pipeline and add a task that will call // In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered. // ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) { if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(newCtx, true);
return this; return this;
} }
EventExecutor executor = newCtx.executor(); EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) { if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -250,12 +254,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
// In this case we add the context to the pipeline and add a task that will call // In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered. // ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) { if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(newCtx, true);
return this; return this;
} }
EventExecutor executor = newCtx.executor(); EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) { if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -308,11 +314,13 @@ public class DefaultChannelPipeline implements ChannelPipeline {
// In this case we remove the context from the pipeline and add a task that will call // In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered. // ChannelHandler.handlerRemoved(...) once the channel is registered.
if (!registered) { if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true); callHandlerCallbackLater(newCtx, true);
return this; return this;
} }
EventExecutor executor = newCtx.executor(); EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) { if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -583,7 +591,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try { try {
ctx.handler().handlerAdded(ctx); ctx.handler().handlerAdded(ctx);
ctx.setAdded(); ctx.setAddComplete();
} catch (Throwable t) { } catch (Throwable t) {
boolean removed = false; boolean removed = false;
try { try {
@ -1153,7 +1161,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
TailContext(DefaultChannelPipeline pipeline) { TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false); super(pipeline, null, TAIL_NAME, true, false);
setAdded(); setAddComplete();
} }
@Override @Override
@ -1212,7 +1220,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
HeadContext(DefaultChannelPipeline pipeline) { HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true); super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe(); unsafe = pipeline.channel().unsafe();
setAdded(); setAddComplete();
} }
@Override @Override

View File

@ -15,7 +15,7 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.OrderedEventExecutor;
/** /**
* Will handle all the I/O operations for a {@link Channel} once registered. * Will handle all the I/O operations for a {@link Channel} once registered.
@ -24,7 +24,7 @@ import io.netty.util.concurrent.EventExecutor;
* implementation details and internals. * implementation details and internals.
* *
*/ */
public interface EventLoop extends EventExecutor, EventLoopGroup { public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override @Override
EventLoopGroup parent(); EventLoopGroup parent();
} }

View File

@ -35,6 +35,7 @@ import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
@ -52,6 +53,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -899,6 +901,37 @@ public class DefaultChannelPipelineTest {
pipeline.addBefore("test", null, newHandler()); pipeline.addBefore("test", null, newHandler());
} }
@Test(timeout = 3000)
public void testUnorderedEventExecutor() throws Throwable {
ChannelPipeline pipeline1 = new LocalChannel().pipeline();
EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(2);
EventLoopGroup defaultGroup = new DefaultEventLoopGroup(1);
try {
EventLoop eventLoop1 = defaultGroup.next();
eventLoop1.register(pipeline1.channel()).syncUninterruptibly();
final CountDownLatch latch = new CountDownLatch(1);
pipeline1.addLast(eventExecutors, new ChannelInboundHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Just block one of the two threads.
LockSupport.park();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
latch.countDown();
}
});
// Trigger an event, as we use UnorderedEventExecutor userEventTriggered should be called even when
// handlerAdded(...) blocks.
pipeline1.fireUserEventTriggered("");
latch.await();
} finally {
defaultGroup.shutdownGracefully().syncUninterruptibly();
eventExecutors.shutdownGracefully().syncUninterruptibly();
}
}
private static final class TestTask implements Runnable { private static final class TestTask implements Runnable {
private final ChannelPipeline pipeline; private final ChannelPipeline pipeline;