Eliminate unnecessary extra synchronization in DefaultChannelPipeline

Motivation:
At the moment whenever we add/remove a ChannelHandler with an EventExecutorGroup we have two synchronization points in the execution path. One to find the childInvoker and one for add/remove itself. We can eliminate the former by call findIInvoker in the synchronization block, as we need to synchronize anyway.

Modification:
Remove the usage of AtomicFieldUpdater and the extra synchronization in findInvoker by moving the call of the method in the synchronized(this) block.

Result:
Less synchronization points and volatile reads/writes
This commit is contained in:
Norman Maurer 2014-04-25 08:46:10 +02:00
parent 958a3b70ce
commit 1b8f34f2a6

View File

@ -36,7 +36,6 @@ import java.util.NoSuchElementException;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* The default {@link ChannelPipeline} implementation. It is usually created
@ -50,21 +49,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private static final WeakHashMap<Class<?>, String>[] nameCaches =
new WeakHashMap[Runtime.getRuntime().availableProcessors()];
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, Map> childInvokersUpdater;
static {
for (int i = 0; i < nameCaches.length; i ++) {
nameCaches[i] = new WeakHashMap<Class<?>, String>();
}
@SuppressWarnings("rawtypes")
AtomicReferenceFieldUpdater<DefaultChannelPipeline, Map> updater;
updater = PlatformDependent.newAtomicReferenceFieldUpdater(DefaultChannelPipeline.class, "childInvokers");
if (updater == null) {
updater = AtomicReferenceFieldUpdater.newUpdater(DefaultChannelPipeline.class, Map.class, "childInvokers");
}
childInvokersUpdater = updater;
}
final AbstractChannel channel;
@ -76,12 +64,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
new HashMap<String, DefaultChannelHandlerContext>(4);
/**
* Updated by {@link #childInvokersUpdater}.
*
* @see #findInvoker(EventExecutorGroup)
*/
@SuppressWarnings("UnusedDeclaration")
private volatile Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers;
private Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers;
DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
@ -111,7 +96,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
return addFirst(findInvoker(group), name, handler);
synchronized (this) {
checkDuplicateName(name);
addFirst0(name, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
}
return this;
}
@Override
@ -144,7 +133,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
return addLast(findInvoker(group), name, handler);
synchronized (this) {
checkDuplicateName(name);
addLast0(name, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
}
return this;
}
@Override
@ -177,7 +170,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
return addBefore(findInvoker(group), baseName, name, handler);
synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name);
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
}
return this;
}
@Override
@ -211,7 +209,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
return addAfter(findInvoker(group), baseName, name, handler);
synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name);
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
}
return this;
}
@Override
@ -246,7 +249,26 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers) {
return addFirst(findInvoker(group), handlers);
if (handlers == null) {
throw new NullPointerException("handlers");
}
if (handlers.length == 0 || handlers[0] == null) {
return this;
}
int size;
for (size = 1; size < handlers.length; size ++) {
if (handlers[size] == null) {
break;
}
}
for (int i = size - 1; i >= 0; i --) {
ChannelHandler h = handlers[i];
addFirst(group, generateName(h), h);
}
return this;
}
@Override
@ -280,7 +302,18 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers) {
return addLast(findInvoker(group), handlers);
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(group, generateName(h), h);
}
return this;
}
@Override
@ -299,6 +332,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return this;
}
// No need for synchronization because it is always executed in a synchronized(this) block.
private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) {
if (group == null) {
return null;
@ -307,26 +341,20 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// Lazily initialize the data structure that maps an EventExecutorGroup to a ChannelHandlerInvoker.
Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers = this.childInvokers;
if (childInvokers == null) {
childInvokers = new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>();
if (!childInvokersUpdater.compareAndSet(this, null, childInvokers)) {
childInvokers = this.childInvokers;
}
childInvokers = this.childInvokers = new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>(4);
}
// Pick one of the child executors and remember its invoker
// so that the same invoker is used to fire events for the same channel.
ChannelHandlerInvoker invoker;
synchronized (childInvokers) {
invoker = childInvokers.get(group);
if (invoker == null) {
EventExecutor executor = group.next();
if (executor instanceof EventLoop) {
invoker = ((EventLoop) executor).asInvoker();
} else {
invoker = new DefaultChannelHandlerInvoker(executor);
}
childInvokers.put(group, invoker);
ChannelHandlerInvoker invoker = childInvokers.get(group);
if (invoker == null) {
EventExecutor executor = group.next();
if (executor instanceof EventLoop) {
invoker = ((EventLoop) executor).asInvoker();
} else {
invoker = new DefaultChannelHandlerInvoker(executor);
}
childInvokers.put(group, invoker);
}
return invoker;