Rethrow the Throwable which accoured while execution the modification in the EventExecutor

This commit is contained in:
norman 2012-06-05 11:34:47 +02:00
parent 4eb42125a7
commit caa35c9772

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@ -84,7 +85,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addFirst(EventExecutor executor, final String name, final ChannelHandler handler) {
try {
Future<?> future;
Future<Throwable> future;
synchronized (this) {
checkDuplicateName(name);
@ -95,19 +96,22 @@ public class DefaultChannelPipeline implements ChannelPipeline {
addFirst0(name, nextCtx, newCtx);
return this;
}
future = newCtx.executor().submit(new Runnable() {
future = newCtx.executor().submit(new AsyncPipelineModification() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addFirst0(name, nextCtx, newCtx);
}
void doCall() {
checkDuplicateName(name);
addFirst0(name, nextCtx, newCtx);
}
});
}
// Call Future.get() outside of the synchronized block to prevent dead-lock
future.get();
Throwable result = future.get();
if (result != null) {
// re-throw exception that was caught
throw result;
}
return this;
} catch (RuntimeException e) {
@ -140,7 +144,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addLast(EventExecutor executor, final String name, final ChannelHandler handler) {
try {
Future<?> future;
Future<Throwable> future;
synchronized (this) {
checkDuplicateName(name);
@ -152,19 +156,24 @@ public class DefaultChannelPipeline implements ChannelPipeline {
addLast0(name, oldTail, newTail);
return this;
} else {
future = newTail.executor().submit(new Runnable() {
future = newTail.executor().submit(new AsyncPipelineModification() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addLast0(name, oldTail, newTail);
}
void doCall() {
checkDuplicateName(name);
addLast0(name, oldTail, newTail);
}
});
}
}
// Call Future.get() outside of synchronized block to prevent dead-lock
future.get();
Throwable result = future.get();
if (result != null) {
// re-throw exception that was caught
throw result;
}
return this;
} catch (RuntimeException e) {
throw e;
} catch (Error e) {
@ -173,7 +182,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new ChannelPipelineException(t);
}
return this;
}
private void addLast0(final String name, DefaultChannelHandlerContext oldTail, DefaultChannelHandlerContext newTail) {
@ -194,7 +202,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addBefore(EventExecutor executor, String baseName, final String name, final ChannelHandler handler) {
try {
Future<?> future;
Future<Throwable> future;
synchronized (this) {
final DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
@ -205,23 +213,25 @@ public class DefaultChannelPipeline implements ChannelPipeline {
addBefore0(name, ctx, newCtx);
return this;
} else {
future = newCtx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addBefore0(name, ctx, newCtx);
}
}
});
future = newCtx.executor().submit(new AsyncPipelineModification() {
@Override
void doCall() {
checkDuplicateName(name);
addBefore0(name, ctx, newCtx);
}
});
}
}
// Call Future.get() outside of the synchronized to prevent dead-lock
future.get();
// Call Future.get() outside of synchronized block to prevent dead-lock
Throwable result = future.get();
if (result != null) {
// re-throw exception that was caught
throw result;
}
return this;
} catch (RuntimeException e) {
throw e;
@ -231,7 +241,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new ChannelPipelineException(t);
}
return this;
}
private void addBefore0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
@ -253,7 +262,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
public ChannelPipeline addAfter(EventExecutor executor, String baseName, final String name, final ChannelHandler handler) {
try {
Future<?> future;
Future<Throwable> future;
synchronized (this) {
final DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
@ -267,18 +276,22 @@ public class DefaultChannelPipeline implements ChannelPipeline {
addAfter0(name, ctx, newCtx);
return this;
} else {
future = newCtx.executor().submit(new Runnable() {
future = newCtx.executor().submit(new AsyncPipelineModification() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addAfter0(name, ctx, newCtx);
}
void doCall() {
checkDuplicateName(name);
addAfter0(name, ctx, newCtx);
}
});
}
}
future.get();
// Call Future.get() outside of synchronized block to prevent dead-lock
Throwable result = future.get();
if (result != null) {
// re-throw exception that was caught
throw result;
}
return this;
} catch (RuntimeException e) {
@ -381,7 +394,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) {
try {
DefaultChannelHandlerContext context;
Future<?> future;
Future<Throwable> future;
synchronized (this) {
if (head == tail) {
return null;
@ -397,12 +410,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
removeLast0(oldTail);
return oldTail;
} else {
future = oldTail.executor().submit(new Runnable() {
future = oldTail.executor().submit(new AsyncPipelineModification() {
@Override
public void run() {
synchronized (oldTail) {
removeLast0(oldTail);
}
void doCall() {
removeLast0(oldTail);
}
});
context = oldTail;
@ -413,23 +425,26 @@ public class DefaultChannelPipeline implements ChannelPipeline {
remove0(ctx);
return ctx;
} else {
future = ctx.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
remove0(ctx);
}
}
future = ctx.executor().submit(new AsyncPipelineModification() {
@Override
void doCall() {
remove0(ctx);
}
});
context = ctx;
}
}
}
// call the Future.get() outside of the synchronization block to prevent from dead-lock
future.get();
// Call Future.get() outside of synchronized block to prevent dead-lock
Throwable result = future.get();
if (result != null) {
// re-throw exception that was caught
throw result;
}
return context;
} catch (RuntimeException e) {
throw e;
@ -464,7 +479,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelHandler removeLast() {
try {
Future<?> future;
Future<Throwable> future;
final DefaultChannelHandlerContext oldTail;
synchronized (this) {
if (head == tail) {
@ -475,19 +490,22 @@ public class DefaultChannelPipeline implements ChannelPipeline {
removeLast0(oldTail);
return oldTail.handler();
} else {
future = oldTail.executor().submit(new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
removeLast0(oldTail);
}
}
});
future = oldTail.executor().submit(new AsyncPipelineModification() {
@Override
void doCall() {
removeLast0(oldTail);
}
});
}
}
// call Future.get() outside of the synchronized block to prevent deadlock
future.get();
// Call Future.get() outside of synchronized block to prevent dead-lock
Throwable result = future.get();
if (result != null) {
// re-throw exception that was caught
throw result;
}
return oldTail.handler();
} catch (RuntimeException e) {
throw e;
@ -529,7 +547,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private ChannelHandler replace(final DefaultChannelHandlerContext ctx, final String newName, final ChannelHandler newHandler) {
try {
Future<?> future;
Future<Throwable> future;
synchronized (this) {
if (ctx == head) {
throw new IllegalArgumentException();
@ -547,17 +565,15 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return ctx.handler();
} else {
future = oldTail.executor().submit(new Runnable() {
future = oldTail.executor().submit(new AsyncPipelineModification() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
removeLast0(oldTail);
checkDuplicateName(newName);
addLast0(newName, tail, newTail);
}
void doCall() {
removeLast0(oldTail);
checkDuplicateName(newName);
addLast0(newName, tail, newTail);
}
});
}
} else {
@ -575,18 +591,24 @@ public class DefaultChannelPipeline implements ChannelPipeline {
replace0(ctx, newName, newCtx);
return ctx.handler();
} else {
future = newCtx.executor().submit(new Runnable() {
future = newCtx.executor().submit(new AsyncPipelineModification() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
replace0(ctx, newName, newCtx);
}
void doCall() {
replace0(ctx, newName, newCtx);
}
});
}
}
}
future.get();
// Call Future.get() outside of synchronized block to prevent dead-lock
Throwable result = future.get();
if (result != null) {
// re-throw exception that was caught
throw result;
}
return ctx.handler();
} catch (RuntimeException e) {
@ -1685,4 +1707,32 @@ public class DefaultChannelPipeline implements ChannelPipeline {
unsafe.flush(future);
}
}
/**
* Custom {@link Callable} implementation which will catch all {@link Throwable} which happens during execution of {@link AsyncPipelineModification#doCall()}
* and return them in the {@link Future}. This allows to re-throw them later.
*
* It also handles the right synchronization of the {@link AsyncPipelineModification#doCall()} method.
*
*/
private abstract class AsyncPipelineModification implements Callable<Throwable> {
@Override
public Throwable call() {
try {
synchronized (DefaultChannelPipeline.this) {
doCall();
}
} catch (Throwable t) {
return t;
}
return null;
}
/**
* Execute the modification
*/
abstract void doCall();
}
}