From d7d64137e02c50811af3bc58dfb7d98a38ec9566 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sun, 10 Jan 2016 22:19:55 +0100 Subject: [PATCH] Let CombinedChannelDuplexHandler correctly handle exceptionCaught. Related to [#4528] Motivation: ChannelInboundHandler and ChannelOutboundHandler both can implement exceptionCaught(...) method and so we need to dispatch to both of them. Modifications: - Correctly first dispatch exceptionCaught to the ChannelInboundHandler but also make sure the next handler it will be dispatched to will be the ChannelOutboundHandler - Add removeInboundHandler() and removeOutboundHandler() which allows to remove one of the combined handlers Result: Correctly handle events --- .../channel/CombinedChannelDuplexHandler.java | 447 +++++++++++++++++- .../CombinedChannelDuplexHandlerTest.java | 324 +++++++++++++ 2 files changed, 747 insertions(+), 24 deletions(-) create mode 100644 transport/src/test/java/io/netty/channel/CombinedChannelDuplexHandlerTest.java diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java index 4c8b0bdca5..0546b39de5 100644 --- a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java @@ -15,15 +15,28 @@ */ package io.netty.channel; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.OneTimeTask; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + import java.net.SocketAddress; /** * Combines a {@link ChannelInboundHandler} and a {@link ChannelOutboundHandler} into one {@link ChannelHandler}. - * */ public class CombinedChannelDuplexHandler extends ChannelDuplexHandler { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class); + + private DelegatingChannelHandlerContext inboundCtx; + private DelegatingChannelHandlerContext outboundCtx; + private volatile boolean handlerAdded; + private I inboundHandler; private O outboundHandler; @@ -88,6 +101,28 @@ public class CombinedChannelDuplexHandler Attribute attr(AttributeKey key) { + return ctx.attr(key); + } + + final void remove() { + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + remove0(); + } else { + executor.execute(new OneTimeTask() { + @Override + public void run() { + remove0(); + } + }); + } + } + + private void remove0() { + if (!removed) { + removed = true; + try { + handler.handlerRemoved(this); + } catch (Throwable cause) { + fireExceptionCaught(new ChannelPipelineException( + handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause)); + } + } + } } } diff --git a/transport/src/test/java/io/netty/channel/CombinedChannelDuplexHandlerTest.java b/transport/src/test/java/io/netty/channel/CombinedChannelDuplexHandlerTest.java new file mode 100644 index 0000000000..0bcf0a7a15 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/CombinedChannelDuplexHandlerTest.java @@ -0,0 +1,324 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayDeque; +import java.util.Queue; + +import static org.junit.Assert.*; + +public class CombinedChannelDuplexHandlerTest { + + private static final Object MSG = new Object(); + private static final SocketAddress ADDRESS = new InetSocketAddress(0); + + private enum Event { + REGISTERED, + UNREGISTERED, + ACTIVE, + INACTIVE, + CHANNEL_READ, + CHANNEL_READ_COMPLETE, + EXCEPTION_CAUGHT, + USER_EVENT_TRIGGERED, + CHANNEL_WRITABILITY_CHANGED, + HANDLER_ADDED, + HANDLER_REMOVED, + BIND, + CONNECT, + WRITE, + FLUSH, + READ, + REGISTER, + DEREGISTER, + CLOSE, + DISCONNECT + } + + @Test(expected = IllegalStateException.class) + public void testInboundRemoveBeforeAdded() { + CombinedChannelDuplexHandler handler = + new CombinedChannelDuplexHandler( + new ChannelInboundHandlerAdapter(), new ChannelOutboundHandlerAdapter()); + handler.removeInboundHandler(); + } + + @Test(expected = IllegalStateException.class) + public void testOutboundRemoveBeforeAdded() { + CombinedChannelDuplexHandler handler = + new CombinedChannelDuplexHandler( + new ChannelInboundHandlerAdapter(), new ChannelOutboundHandlerAdapter()); + handler.removeOutboundHandler(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInboundHandlerImplementsOutboundHandler() { + new CombinedChannelDuplexHandler( + new ChannelDuplexHandler(), new ChannelOutboundHandlerAdapter()); + } + + @Test(expected = IllegalArgumentException.class) + public void testOutboundHandlerImplementsInbboundHandler() { + new CombinedChannelDuplexHandler( + new ChannelInboundHandlerAdapter(), new ChannelDuplexHandler()); + } + + @Test(expected = IllegalStateException.class) + public void testInitNotCalledBeforeAdded() throws Exception { + CombinedChannelDuplexHandler handler = + new CombinedChannelDuplexHandler() { }; + handler.handlerAdded(null); + } + + @Test + public void testExceptionCaughtBothCombinedHandlers() { + final Exception exception = new Exception(); + final Queue queue = new ArrayDeque(); + + ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + assertSame(exception, cause); + queue.add(this); + ctx.fireExceptionCaught(cause); + } + }; + ChannelOutboundHandler outboundHandler = new ChannelOutboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + assertSame(exception, cause); + queue.add(this); + ctx.fireExceptionCaught(cause); + } + }; + ChannelInboundHandler lastHandler = new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + assertSame(exception, cause); + queue.add(this); + } + }; + EmbeddedChannel channel = new EmbeddedChannel( + new CombinedChannelDuplexHandler( + inboundHandler, outboundHandler), lastHandler); + channel.pipeline().fireExceptionCaught(exception); + assertFalse(channel.finish()); + assertSame(inboundHandler, queue.poll()); + assertSame(outboundHandler, queue.poll()); + assertSame(lastHandler, queue.poll()); + assertTrue(queue.isEmpty()); + } + + @Test + public void testInboundEvents() { + final Queue queue = new ArrayDeque(); + + ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.HANDLER_ADDED); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.HANDLER_REMOVED); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.REGISTERED); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.UNREGISTERED); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.ACTIVE); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.INACTIVE); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + queue.add(Event.CHANNEL_READ); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.CHANNEL_READ_COMPLETE); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + queue.add(Event.USER_EVENT_TRIGGERED); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.CHANNEL_WRITABILITY_CHANGED); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + queue.add(Event.EXCEPTION_CAUGHT); + } + }; + + CombinedChannelDuplexHandler handler = + new CombinedChannelDuplexHandler( + inboundHandler, new ChannelOutboundHandlerAdapter()); + + EmbeddedChannel channel = new EmbeddedChannel(handler); + channel.pipeline().fireChannelWritabilityChanged(); + channel.pipeline().fireUserEventTriggered(MSG); + channel.pipeline().fireChannelRead(MSG); + channel.pipeline().fireChannelReadComplete(); + + assertEquals(Event.HANDLER_ADDED, queue.poll()); + assertEquals(Event.REGISTERED, queue.poll()); + assertEquals(Event.ACTIVE, queue.poll()); + assertEquals(Event.CHANNEL_WRITABILITY_CHANGED, queue.poll()); + assertEquals(Event.USER_EVENT_TRIGGERED, queue.poll()); + assertEquals(Event.CHANNEL_READ, queue.poll()); + assertEquals(Event.CHANNEL_READ_COMPLETE, queue.poll()); + + handler.removeInboundHandler(); + assertEquals(Event.HANDLER_REMOVED, queue.poll()); + + // These should not be handled by the inboundHandler anymore as it was removed before + channel.pipeline().fireChannelWritabilityChanged(); + channel.pipeline().fireUserEventTriggered(MSG); + channel.pipeline().fireChannelRead(MSG); + channel.pipeline().fireChannelReadComplete(); + + // Should have not received any more events as it was removed before via removeInboundHandler() + assertTrue(queue.isEmpty()); + assertTrue(channel.finish()); + assertTrue(queue.isEmpty()); + } + + @Test + public void testOutboundEvents() { + final Queue queue = new ArrayDeque(); + + ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter(); + ChannelOutboundHandler outboundHandler = new ChannelOutboundHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.HANDLER_ADDED); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.HANDLER_REMOVED); + } + + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) + throws Exception { + queue.add(Event.BIND); + } + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise promise) throws Exception { + queue.add(Event.CONNECT); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + queue.add(Event.DISCONNECT); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + queue.add(Event.CLOSE); + } + + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + queue.add(Event.DEREGISTER); + } + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.READ); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + queue.add(Event.WRITE); + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.FLUSH); + } + }; + + CombinedChannelDuplexHandler handler = + new CombinedChannelDuplexHandler( + inboundHandler, outboundHandler); + + EmbeddedChannel channel = new EmbeddedChannel(); + channel.pipeline().addFirst(handler); + + doOutboundOperations(channel); + + assertEquals(Event.HANDLER_ADDED, queue.poll()); + assertEquals(Event.BIND, queue.poll()); + assertEquals(Event.CONNECT, queue.poll()); + assertEquals(Event.WRITE, queue.poll()); + assertEquals(Event.FLUSH, queue.poll()); + assertEquals(Event.READ, queue.poll()); + assertEquals(Event.CLOSE, queue.poll()); + assertEquals(Event.CLOSE, queue.poll()); + assertEquals(Event.DEREGISTER, queue.poll()); + + handler.removeOutboundHandler(); + assertEquals(Event.HANDLER_REMOVED, queue.poll()); + + // These should not be handled by the inboundHandler anymore as it was removed before + doOutboundOperations(channel); + + // Should have not received any more events as it was removed before via removeInboundHandler() + assertTrue(queue.isEmpty()); + assertTrue(channel.finish()); + assertTrue(queue.isEmpty()); + } + + private static void doOutboundOperations(Channel channel) { + channel.pipeline().bind(ADDRESS); + channel.pipeline().connect(ADDRESS); + channel.pipeline().write(MSG); + channel.pipeline().flush(); + channel.pipeline().read(); + channel.pipeline().disconnect(); + channel.pipeline().close(); + channel.pipeline().deregister(); + } +}