Make sure we can't deadlock even if the ChannelPipeline modification is executed by the EventExecutor

This commit is contained in:
norman 2012-06-05 11:21:44 +02:00
parent c2e65016fd
commit 4eb42125a7

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
/**
* The default {@link ChannelPipeline} implementation. It is usually created
@ -54,6 +55,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
final Map<EventExecutor, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutor, EventExecutor>();
public DefaultChannelPipeline(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
@ -80,23 +82,34 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized ChannelPipeline addFirst(EventExecutor executor, final String name, final ChannelHandler handler) {
public ChannelPipeline addFirst(EventExecutor executor, final String name, final ChannelHandler handler) {
try {
Future<?> future;
synchronized (this) {
checkDuplicateName(name);
final DefaultChannelHandlerContext nextCtx = head.next;
final DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, executor, head, nextCtx, name, handler);
final DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, executor, head, nextCtx, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addFirst0(name, nextCtx, newCtx);
} else {
try {
newCtx.executor().submit(new Runnable() {
return this;
}
future = newCtx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addFirst0(name, nextCtx, newCtx);
}
}).get();
}
});
}
// Call Future.get() outside of the synchronized block to prevent dead-lock
future.get();
return this;
} catch (RuntimeException e) {
throw e;
} catch (Error e) {
@ -104,10 +117,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} catch (Throwable t) {
throw new ChannelPipelineException(t);
}
}
return this;
}
private void addFirst0(final String name, DefaultChannelHandlerContext nextCtx, DefaultChannelHandlerContext newCtx) {
callBeforeAdd(newCtx);
@ -126,24 +138,33 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized ChannelPipeline addLast(EventExecutor executor, final String name, final ChannelHandler handler) {
checkDuplicateName(name);
final DefaultChannelHandlerContext oldTail = tail;
final DefaultChannelHandlerContext newTail =
new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler);
public ChannelPipeline addLast(EventExecutor executor, final String name, final ChannelHandler handler) {
try {
Future<?> future;
synchronized (this) {
checkDuplicateName(name);
final DefaultChannelHandlerContext oldTail = tail;
final DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler);
if (!newTail.channel().isRegistered() || newTail.executor().inEventLoop()) {
addLast0(name, oldTail, newTail);
return this;
} else {
try {
newTail.executor().submit(new Runnable() {
future = newTail.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addLast0(name, oldTail, newTail);
}
}).get();
}
});
}
}
// Call Future.get() outside of synchronized block to prevent dead-lock
future.get();
} catch (RuntimeException e) {
throw e;
} catch (Error e) {
@ -151,7 +172,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} catch (Throwable t) {
throw new ChannelPipelineException(t);
}
}
return this;
}
@ -171,23 +192,37 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized ChannelPipeline addBefore(EventExecutor executor, String baseName, final String name, final ChannelHandler handler) {
public ChannelPipeline addBefore(EventExecutor executor, String baseName, final String name, final ChannelHandler handler) {
try {
Future<?> future;
synchronized (this) {
final DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name);
final DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler);
final DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addBefore0(name, ctx, newCtx);
return this;
} else {
try {
newCtx.executor().submit(new Runnable() {
future = newCtx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addBefore0(name, ctx, newCtx);
}
}).get();
}
});
}
}
// Call Future.get() outside of the synchronized to prevent dead-lock
future.get();
} catch (RuntimeException e) {
throw e;
} catch (Error e) {
@ -195,7 +230,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} catch (Throwable t) {
throw new ChannelPipelineException(t);
}
}
return this;
}
@ -215,26 +250,37 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized ChannelPipeline addAfter(EventExecutor executor, String baseName, final String name, final ChannelHandler handler) {
public ChannelPipeline addAfter(EventExecutor executor, String baseName, final String name, final ChannelHandler handler) {
try {
Future<?> future;
synchronized (this) {
final DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
if (ctx == tail) {
addLast(name, handler);
} else {
return addLast(name, handler);
}
checkDuplicateName(name);
final DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, executor, ctx, ctx.next, name, handler);
final DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, executor, ctx, ctx.next, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addAfter0(name, ctx, newCtx);
return this;
} else {
try {
newCtx.executor().submit(new Runnable() {
future = newCtx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addAfter0(name, ctx, newCtx);
}
}).get();
}
});
}
}
future.get();
return this;
} catch (RuntimeException e) {
throw e;
} catch (Error e) {
@ -242,9 +288,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} catch (Throwable t) {
throw new ChannelPipelineException(t);
}
}
}
return this;
}
private void addAfter0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
@ -320,38 +364,73 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized void remove(ChannelHandler handler) {
public void remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
}
@Override
public synchronized ChannelHandler remove(String name) {
public ChannelHandler remove(String name) {
return remove(getContextOrDie(name)).handler();
}
@Override
public synchronized <T extends ChannelHandler> T remove(Class<T> handlerType) {
public <T extends ChannelHandler> T remove(Class<T> handlerType) {
return (T) remove(getContextOrDie(handlerType)).handler();
}
private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) {
try {
DefaultChannelHandlerContext context;
Future<?> future;
synchronized (this) {
if (head == tail) {
return null;
} else if (ctx == head) {
throw new Error(); // Should never happen.
} else if (ctx == tail) {
removeLast();
if (head == tail) {
throw new NoSuchElementException();
}
final DefaultChannelHandlerContext oldTail = tail;
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
removeLast0(oldTail);
return oldTail;
} else {
future = oldTail.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (oldTail) {
removeLast0(oldTail);
}
}
});
context = oldTail;
}
} else {
if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) {
remove0(ctx);
return ctx;
} else {
try {
ctx.executor().submit(new Runnable() {
future = ctx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
remove0(ctx);
}
}).get();
}
});
context = ctx;
}
}
}
// call the Future.get() outside of the synchronization block to prevent from dead-lock
future.get();
return context;
} catch (RuntimeException e) {
throw e;
} catch (Error e) {
@ -359,9 +438,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} catch (Throwable t) {
throw new ChannelPipelineException(t);
}
}
}
return ctx;
}
private void remove0(DefaultChannelHandlerContext ctx) {
@ -377,7 +454,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized ChannelHandler removeFirst() {
public ChannelHandler removeFirst() {
if (head == tail) {
throw new NoSuchElementException();
}
@ -385,22 +462,33 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized ChannelHandler removeLast() {
public ChannelHandler removeLast() {
try {
Future<?> future;
final DefaultChannelHandlerContext oldTail;
synchronized (this) {
if (head == tail) {
throw new NoSuchElementException();
}
final DefaultChannelHandlerContext oldTail = tail;
oldTail = tail;
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
removeLast0(oldTail);
return oldTail.handler();
} else {
try {
oldTail.executor().submit(new Runnable() {
future = oldTail.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
removeLast0(oldTail);
}
}).get();
}
});
}
}
// call Future.get() outside of the synchronized block to prevent deadlock
future.get();
return oldTail.handler();
} catch (RuntimeException e) {
throw e;
} catch (Error e) {
@ -408,8 +496,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} catch (Throwable t) {
throw new ChannelPipelineException(t);
}
}
return oldTail.handler();
}
private void removeLast0(DefaultChannelHandlerContext oldTail) {
@ -423,28 +511,55 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
public void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
replace(getContextOrDie(oldHandler), newName, newHandler);
}
@Override
public synchronized ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
return replace(getContextOrDie(oldName), newName, newHandler);
}
@Override
@SuppressWarnings("unchecked")
public synchronized <T extends ChannelHandler> T replace(
public <T extends ChannelHandler> T replace(
Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
}
private ChannelHandler replace(final DefaultChannelHandlerContext ctx, final String newName, final ChannelHandler newHandler) {
try {
Future<?> future;
synchronized (this) {
if (ctx == head) {
throw new IllegalArgumentException();
} else if (ctx == tail) {
removeLast();
addLast(newName, newHandler);
if (head == tail) {
throw new NoSuchElementException();
}
final DefaultChannelHandlerContext oldTail = tail;
final DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(this, null, oldTail, null, newName, newHandler);
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
removeLast0(oldTail);
checkDuplicateName(newName);
addLast0(newName, tail, newTail);
return ctx.handler();
} else {
future = oldTail.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
removeLast0(oldTail);
checkDuplicateName(newName);
addLast0(newName, tail, newTail);
}
}
});
}
} else {
boolean sameName = ctx.name().equals(newName);
if (!sameName) {
@ -453,19 +568,27 @@ public class DefaultChannelPipeline implements ChannelPipeline {
DefaultChannelHandlerContext prev = ctx.prev;
DefaultChannelHandlerContext next = ctx.next;
final DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, ctx.executor, prev, next, newName, newHandler);
final DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, ctx.executor, prev, next, newName, newHandler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
replace0(ctx, newName, newCtx);
return ctx.handler();
} else {
try {
newCtx.executor().submit(new Runnable() {
future = newCtx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
replace0(ctx, newName, newCtx);
}
}).get();
}
});
}
}
}
future.get();
return ctx.handler();
} catch (RuntimeException e) {
throw e;
} catch (Error e) {
@ -474,10 +597,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new ChannelPipelineException(t);
}
}
}
return ctx.handler();
}
private void replace0(DefaultChannelHandlerContext ctx, String newName, DefaultChannelHandlerContext newCtx) {
boolean sameName = ctx.name().equals(newName);