Eliminate unnecessary extra synchronization in DefaultChannelPipeline
Motivation: At the moment whenever we add/remove a ChannelHandler with an EventExecutorGroup we have two synchronization points in the execution path. One to find the childInvoker and one for add/remove itself. We can eliminate the former by call findIInvoker in the synchronization block, as we need to synchronize anyway. Modification: Remove the usage of AtomicFieldUpdater and the extra synchronization in findInvoker by moving the call of the method in the synchronized(this) block. Result: Less synchronization points and volatile reads/writes
This commit is contained in:
parent
1492b32da7
commit
d8d822fddd
@ -36,7 +36,6 @@ import java.util.NoSuchElementException;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
|
||||
/**
|
||||
* The default {@link ChannelPipeline} implementation. It is usually created
|
||||
@ -50,21 +49,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
private static final WeakHashMap<Class<?>, String>[] nameCaches =
|
||||
new WeakHashMap[Runtime.getRuntime().availableProcessors()];
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, Map> childInvokersUpdater;
|
||||
|
||||
static {
|
||||
for (int i = 0; i < nameCaches.length; i ++) {
|
||||
nameCaches[i] = new WeakHashMap<Class<?>, String>();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
AtomicReferenceFieldUpdater<DefaultChannelPipeline, Map> updater;
|
||||
updater = PlatformDependent.newAtomicReferenceFieldUpdater(DefaultChannelPipeline.class, "childInvokers");
|
||||
if (updater == null) {
|
||||
updater = AtomicReferenceFieldUpdater.newUpdater(DefaultChannelPipeline.class, Map.class, "childInvokers");
|
||||
}
|
||||
childInvokersUpdater = updater;
|
||||
}
|
||||
|
||||
final AbstractChannel channel;
|
||||
@ -76,12 +64,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
new HashMap<String, DefaultChannelHandlerContext>(4);
|
||||
|
||||
/**
|
||||
* Updated by {@link #childInvokersUpdater}.
|
||||
*
|
||||
* @see #findInvoker(EventExecutorGroup)
|
||||
*/
|
||||
@SuppressWarnings("UnusedDeclaration")
|
||||
private volatile Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers;
|
||||
private Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers;
|
||||
|
||||
DefaultChannelPipeline(AbstractChannel channel) {
|
||||
if (channel == null) {
|
||||
@ -111,7 +96,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
@Override
|
||||
public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
|
||||
return addFirst(findInvoker(group), name, handler);
|
||||
synchronized (this) {
|
||||
checkDuplicateName(name);
|
||||
addFirst0(name, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -144,7 +133,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
@Override
|
||||
public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
|
||||
return addLast(findInvoker(group), name, handler);
|
||||
synchronized (this) {
|
||||
checkDuplicateName(name);
|
||||
addLast0(name, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -177,7 +170,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
@Override
|
||||
public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
|
||||
return addBefore(findInvoker(group), baseName, name, handler);
|
||||
synchronized (this) {
|
||||
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||
checkDuplicateName(name);
|
||||
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -211,7 +209,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
@Override
|
||||
public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
|
||||
return addAfter(findInvoker(group), baseName, name, handler);
|
||||
synchronized (this) {
|
||||
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
|
||||
checkDuplicateName(name);
|
||||
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -246,7 +249,26 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
@Override
|
||||
public ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers) {
|
||||
return addFirst(findInvoker(group), handlers);
|
||||
if (handlers == null) {
|
||||
throw new NullPointerException("handlers");
|
||||
}
|
||||
if (handlers.length == 0 || handlers[0] == null) {
|
||||
return this;
|
||||
}
|
||||
|
||||
int size;
|
||||
for (size = 1; size < handlers.length; size ++) {
|
||||
if (handlers[size] == null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = size - 1; i >= 0; i --) {
|
||||
ChannelHandler h = handlers[i];
|
||||
addFirst(group, generateName(h), h);
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -280,7 +302,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
@Override
|
||||
public ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers) {
|
||||
return addLast(findInvoker(group), handlers);
|
||||
if (handlers == null) {
|
||||
throw new NullPointerException("handlers");
|
||||
}
|
||||
|
||||
for (ChannelHandler h: handlers) {
|
||||
if (h == null) {
|
||||
break;
|
||||
}
|
||||
addLast(group, generateName(h), h);
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -299,6 +332,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
return this;
|
||||
}
|
||||
|
||||
// No need for synchronization because it is always executed in a synchronized(this) block.
|
||||
private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) {
|
||||
if (group == null) {
|
||||
return null;
|
||||
@ -307,26 +341,20 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
||||
// Lazily initialize the data structure that maps an EventExecutorGroup to a ChannelHandlerInvoker.
|
||||
Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers = this.childInvokers;
|
||||
if (childInvokers == null) {
|
||||
childInvokers = new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>();
|
||||
if (!childInvokersUpdater.compareAndSet(this, null, childInvokers)) {
|
||||
childInvokers = this.childInvokers;
|
||||
}
|
||||
childInvokers = this.childInvokers = new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>(4);
|
||||
}
|
||||
|
||||
// Pick one of the child executors and remember its invoker
|
||||
// so that the same invoker is used to fire events for the same channel.
|
||||
ChannelHandlerInvoker invoker;
|
||||
synchronized (childInvokers) {
|
||||
invoker = childInvokers.get(group);
|
||||
if (invoker == null) {
|
||||
EventExecutor executor = group.next();
|
||||
if (executor instanceof EventLoop) {
|
||||
invoker = ((EventLoop) executor).asInvoker();
|
||||
} else {
|
||||
invoker = new DefaultChannelHandlerInvoker(executor);
|
||||
}
|
||||
childInvokers.put(group, invoker);
|
||||
ChannelHandlerInvoker invoker = childInvokers.get(group);
|
||||
if (invoker == null) {
|
||||
EventExecutor executor = group.next();
|
||||
if (executor instanceof EventLoop) {
|
||||
invoker = ((EventLoop) executor).asInvoker();
|
||||
} else {
|
||||
invoker = new DefaultChannelHandlerInvoker(executor);
|
||||
}
|
||||
childInvokers.put(group, invoker);
|
||||
}
|
||||
|
||||
return invoker;
|
||||
|
Loading…
x
Reference in New Issue
Block a user