No need for a custom Runnable implementation, just use EventExecutor.submit(..).get(). Thanks @trusting for review
This commit is contained in:
parent
ecea558e03
commit
6b637ab22f
@ -89,19 +89,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
addFirst0(name, handler, nextCtx, newCtx);
|
||||
} else {
|
||||
ChannelPipelineModificationRunnable runnable = new ChannelPipelineModificationRunnable() {
|
||||
try {
|
||||
newCtx.executor().submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
checkDuplicateName(name);
|
||||
|
||||
@Override
|
||||
protected void runTask() {
|
||||
checkDuplicateName(name);
|
||||
|
||||
addFirst0(name, handler, nextCtx, newCtx);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
newCtx.executor().execute(runnable);
|
||||
runnable.await();
|
||||
addFirst0(name, handler, nextCtx, newCtx);
|
||||
}
|
||||
}).get();
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelException(t);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return this;
|
||||
@ -134,19 +135,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
if (!newTail.channel().isRegistered() || newTail.executor().inEventLoop()) {
|
||||
addLast0(name, handler, oldTail, newTail);
|
||||
} else {
|
||||
ChannelPipelineModificationRunnable runnable = new ChannelPipelineModificationRunnable() {
|
||||
try {
|
||||
newTail.executor().submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
checkDuplicateName(name);
|
||||
|
||||
@Override
|
||||
protected void runTask() {
|
||||
checkDuplicateName(name);
|
||||
|
||||
addLast0(name, handler, oldTail, newTail);
|
||||
}
|
||||
|
||||
};
|
||||
addLast0(name, handler, oldTail, newTail);
|
||||
}
|
||||
}).get();
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelException(t);
|
||||
}
|
||||
|
||||
newTail.executor().execute(runnable);
|
||||
runnable.await();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
@ -176,19 +178,19 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
addBefore0(name, handler, ctx, newCtx);
|
||||
} else {
|
||||
ChannelPipelineModificationRunnable runnable = new ChannelPipelineModificationRunnable() {
|
||||
try {
|
||||
newCtx.executor().submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
checkDuplicateName(name);
|
||||
|
||||
@Override
|
||||
protected void runTask() {
|
||||
checkDuplicateName(name);
|
||||
|
||||
addBefore0(name, handler, ctx, newCtx);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
newCtx.executor().execute(runnable);
|
||||
runnable.await();
|
||||
addBefore0(name, handler, ctx, newCtx);
|
||||
}
|
||||
}).get();
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelException(t);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
@ -220,19 +222,19 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
addAfter0(name, handler, ctx, newCtx);
|
||||
} else {
|
||||
ChannelPipelineModificationRunnable runnable = new ChannelPipelineModificationRunnable() {
|
||||
try {
|
||||
newCtx.executor().submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
checkDuplicateName(name);
|
||||
|
||||
@Override
|
||||
protected void runTask() {
|
||||
checkDuplicateName(name);
|
||||
|
||||
addAfter0(name, handler, ctx, newCtx);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
newCtx.executor().execute(runnable);
|
||||
runnable.await();
|
||||
addAfter0(name, handler, ctx, newCtx);
|
||||
}
|
||||
}).get();
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelException(t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -338,18 +340,18 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) {
|
||||
remove0(ctx);
|
||||
} else {
|
||||
ChannelPipelineModificationRunnable runnable = new ChannelPipelineModificationRunnable() {
|
||||
try {
|
||||
ctx.executor().submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
remove0(ctx);
|
||||
|
||||
@Override
|
||||
protected void runTask() {
|
||||
remove0(ctx);
|
||||
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
ctx.executor().execute(runnable);
|
||||
runnable.await();
|
||||
}
|
||||
}).get();
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelException(t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -386,17 +388,19 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) {
|
||||
removeLast0(oldTail);
|
||||
} else {
|
||||
ChannelPipelineModificationRunnable runnable = new ChannelPipelineModificationRunnable() {
|
||||
try {
|
||||
oldTail.executor().submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
removeLast0(oldTail);
|
||||
|
||||
@Override
|
||||
protected void runTask() {
|
||||
removeLast0(oldTail);
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
}).get();
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelException(t);
|
||||
}
|
||||
|
||||
oldTail.executor().execute(runnable);
|
||||
runnable.await();
|
||||
}
|
||||
|
||||
return oldTail.handler();
|
||||
@ -449,16 +453,18 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
|
||||
replace0(ctx, newName, newHandler, newCtx);
|
||||
} else {
|
||||
ChannelPipelineModificationRunnable runnable = new ChannelPipelineModificationRunnable() {
|
||||
try {
|
||||
newCtx.executor().submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
replace0(ctx, newName, newHandler, newCtx);
|
||||
|
||||
@Override
|
||||
protected void runTask() {
|
||||
replace0(ctx, newName, newHandler, newCtx);
|
||||
}
|
||||
|
||||
};
|
||||
newCtx.executor().execute(runnable);
|
||||
runnable.await();
|
||||
}
|
||||
}).get();
|
||||
} catch (Throwable t) {
|
||||
throw new ChannelException(t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -1551,43 +1557,4 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
unsafe.flush(future);
|
||||
}
|
||||
}
|
||||
|
||||
private abstract class ChannelPipelineModificationRunnable implements Runnable {
|
||||
private ChannelException cause;
|
||||
|
||||
@Override
|
||||
public final void run() {
|
||||
try {
|
||||
runTask();
|
||||
|
||||
} catch (Throwable t) {
|
||||
if (t instanceof ChannelException) {
|
||||
cause = (ChannelException) t;
|
||||
} else {
|
||||
this.cause = new ChannelException(t);
|
||||
}
|
||||
} finally {
|
||||
synchronized (ChannelPipelineModificationRunnable.this) {
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected abstract void runTask();
|
||||
|
||||
void await() {
|
||||
try {
|
||||
synchronized (ChannelPipelineModificationRunnable.this) {
|
||||
wait();
|
||||
}
|
||||
if (cause != null) {
|
||||
throw cause;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user