[#3780] Handle ChannelInitializer exception in exceptionCaught()
Motivation: At the moment we directly closed the Channel when an exception accoured durring initChannel(...) without giving the user any way to do extra or special handling. Modifications: Handle the exception in exceptionCaught(...) of the ChannelInitializer which will by default log and close the Channel. This way the user can override this. Result: More felixible handling of exceptions.
This commit is contained in:
parent
287ac6d328
commit
c9e71aafbd
@ -56,29 +56,33 @@ public abstract class ChannelInitializer<C extends Channel> extends ChannelInbou
|
|||||||
* will be removed from the {@link ChannelPipeline} of the {@link Channel}.
|
* will be removed from the {@link ChannelPipeline} of the {@link Channel}.
|
||||||
*
|
*
|
||||||
* @param ch the {@link Channel} which was registered.
|
* @param ch the {@link Channel} which was registered.
|
||||||
* @throws Exception is thrown if an error occurs. In that case the {@link Channel} will be closed.
|
* @throws Exception is thrown if an error occurs. In that case it will be handled by
|
||||||
|
* {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default close
|
||||||
|
* the {@link Channel}.
|
||||||
*/
|
*/
|
||||||
protected abstract void initChannel(C ch) throws Exception;
|
protected abstract void initChannel(C ch) throws Exception;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||||
ChannelPipeline pipeline = ctx.pipeline();
|
initChannel((C) ctx.channel());
|
||||||
boolean success = false;
|
ctx.pipeline().remove(this);
|
||||||
|
ctx.fireChannelRegistered();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle the {@link Throwable} by logging and closing the {@link Channel}. Sub-classes may override this.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause);
|
||||||
try {
|
try {
|
||||||
initChannel((C) ctx.channel());
|
ChannelPipeline pipeline = ctx.pipeline();
|
||||||
pipeline.remove(this);
|
|
||||||
ctx.fireChannelRegistered();
|
|
||||||
success = true;
|
|
||||||
} catch (Throwable t) {
|
|
||||||
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
|
|
||||||
} finally {
|
|
||||||
if (pipeline.context(this) != null) {
|
if (pipeline.context(this) != null) {
|
||||||
pipeline.remove(this);
|
pipeline.remove(this);
|
||||||
}
|
}
|
||||||
if (!success) {
|
} finally {
|
||||||
ctx.close();
|
ctx.close();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,7 @@ import io.netty.bootstrap.ServerBootstrap;
|
|||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandler.Sharable;
|
import io.netty.channel.ChannelHandler.Sharable;
|
||||||
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.channel.local.LocalAddress;
|
import io.netty.channel.local.LocalAddress;
|
||||||
import io.netty.channel.local.LocalChannel;
|
import io.netty.channel.local.LocalChannel;
|
||||||
import io.netty.channel.local.LocalEventLoopGroup;
|
import io.netty.channel.local.LocalEventLoopGroup;
|
||||||
@ -535,6 +536,29 @@ public class DefaultChannelPipelineTest {
|
|||||||
assertNull(pipeline.last());
|
assertNull(pipeline.last());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5000)
|
||||||
|
public void testChannelInitializerException() throws Exception {
|
||||||
|
final IllegalStateException exception = new IllegalStateException();
|
||||||
|
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
EmbeddedChannel channel = new EmbeddedChannel(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
|
throw exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
super.exceptionCaught(ctx, cause);
|
||||||
|
error.set(cause);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
latch.await();
|
||||||
|
assertFalse(channel.isActive());
|
||||||
|
assertSame(exception, error.get());
|
||||||
|
}
|
||||||
|
|
||||||
private static int next(AbstractChannelHandlerContext ctx) {
|
private static int next(AbstractChannelHandlerContext ctx) {
|
||||||
AbstractChannelHandlerContext next = ctx.next;
|
AbstractChannelHandlerContext next = ctx.next;
|
||||||
if (next == null) {
|
if (next == null) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user