[#1947] Handle RejectExecutionException graceful for outbound operations

This commit is contained in:
Norman Maurer 2013-10-24 15:10:32 +02:00
parent 9b779e50ba
commit 98541be4b7

View File

@ -405,12 +405,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
executor.execute(new Runnable() {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
});
}, promise);
}
return promise;
@ -443,12 +443,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
executor.execute(new Runnable() {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
});
}, promise);
}
return promise;
@ -477,7 +477,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
next.invokeDisconnect(promise);
}
} else {
executor.execute(new Runnable() {
safeExecute(executor, new Runnable() {
@Override
public void run() {
if (!channel().metadata().hasDisconnect()) {
@ -486,7 +486,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
next.invokeDisconnect(promise);
}
}
});
}, promise);
}
return promise;
@ -509,12 +509,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (executor.inEventLoop()) {
next.invokeClose(promise);
} else {
executor.execute(new Runnable() {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeClose(promise);
}
});
}, promise);
}
return promise;
@ -600,7 +600,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
};
}
executor.execute(task);
safeExecute(executor, task, channel.voidPromise());
}
return this;
@ -645,7 +645,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
buffer.incrementPendingOutboundBytes(size);
}
}
executor.execute(WriteTask.newInstance(next, msg, size, flush, promise));
safeExecute(executor, WriteTask.newInstance(next, msg, size, flush, promise), promise);
}
}
@ -784,6 +784,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return removed;
}
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise) {
try {
executor.execute(runnable);
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
static final class WriteTask implements Runnable {
private DefaultChannelHandlerContext ctx;
private Object msg;