Let CombinedChannelDuplexHandler correctly handle exceptionCaught. Related to [#4528]

Motivation:

ChannelInboundHandler and ChannelOutboundHandler both can implement exceptionCaught(...) method and so we need to dispatch to both of them.

Modifications:

- Correctly first dispatch exceptionCaught to the ChannelInboundHandler but also make sure the next handler it will be dispatched to will be the ChannelOutboundHandler
- Add removeInboundHandler() and removeOutboundHandler() which allows to remove one of the combined handlers

Result:

Correctly handle events
This commit is contained in:
Norman Maurer 2016-01-10 22:19:55 +01:00
parent 8bfbb35979
commit d7d64137e0
2 changed files with 747 additions and 24 deletions

View File

@ -15,15 +15,28 @@
*/
package io.netty.channel;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.SocketAddress;
/**
* Combines a {@link ChannelInboundHandler} and a {@link ChannelOutboundHandler} into one {@link ChannelHandler}.
*
*/
public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
extends ChannelDuplexHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class);
private DelegatingChannelHandlerContext inboundCtx;
private DelegatingChannelHandlerContext outboundCtx;
private volatile boolean handlerAdded;
private I inboundHandler;
private O outboundHandler;
@ -88,6 +101,28 @@ public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O ext
return outboundHandler;
}
private void checkAdded() {
if (!handlerAdded) {
throw new IllegalStateException("handler not added to pipeline yet");
}
}
/**
* Removes the {@link ChannelInboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
*/
public final void removeInboundHandler() {
checkAdded();
inboundCtx.remove();
}
/**
* Removes the {@link ChannelOutboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
*/
public final void removeOutboundHandler() {
checkAdded();
outboundCtx.remove();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (inboundHandler == null) {
@ -96,67 +131,151 @@ public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O ext
" if " + CombinedChannelDuplexHandler.class.getSimpleName() +
" was constructed with the default constructor.");
}
outboundCtx = new DelegatingChannelHandlerContext(ctx, outboundHandler);
inboundCtx = new DelegatingChannelHandlerContext(ctx, inboundHandler) {
@SuppressWarnings("deprecation")
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
if (!outboundCtx.removed) {
try {
// We directly delegate to the ChannelOutboundHandler as this may override exceptionCaught(...)
// as well
outboundHandler.exceptionCaught(outboundCtx, cause);
} catch (Throwable error) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", error);
}
}
} else {
super.fireExceptionCaught(cause);
}
return this;
}
};
// The inboundCtx and outboundCtx were created and set now it's safe to call removeInboundHandler() and
// removeOutboundHandler().
handlerAdded = true;
try {
inboundHandler.handlerAdded(ctx);
inboundHandler.handlerAdded(inboundCtx);
} finally {
outboundHandler.handlerAdded(ctx);
outboundHandler.handlerAdded(outboundCtx);
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
try {
inboundHandler.handlerRemoved(ctx);
inboundCtx.remove();
} finally {
outboundHandler.handlerRemoved(ctx);
outboundCtx.remove();
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelRegistered(ctx);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelRegistered(inboundCtx);
} else {
inboundCtx.fireChannelRegistered();
}
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelUnregistered(ctx);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelUnregistered(inboundCtx);
} else {
inboundCtx.fireChannelUnregistered();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelActive(ctx);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelActive(inboundCtx);
} else {
inboundCtx.fireChannelActive();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelInactive(ctx);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelInactive(inboundCtx);
} else {
inboundCtx.fireChannelInactive();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
inboundHandler.exceptionCaught(ctx, cause);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.exceptionCaught(inboundCtx, cause);
} else {
inboundCtx.fireExceptionCaught(cause);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
inboundHandler.userEventTriggered(ctx, evt);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.userEventTriggered(inboundCtx, evt);
} else {
inboundCtx.fireUserEventTriggered(evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundHandler.channelRead(ctx, msg);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelRead(inboundCtx, msg);
} else {
inboundCtx.fireChannelRead(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelReadComplete(ctx);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelReadComplete(inboundCtx);
} else {
inboundCtx.fireChannelReadComplete();
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelWritabilityChanged(inboundCtx);
} else {
inboundCtx.fireChannelWritabilityChanged();
}
}
@Override
public void bind(
ChannelHandlerContext ctx,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
outboundHandler.bind(ctx, localAddress, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.bind(outboundCtx, localAddress, promise);
} else {
outboundCtx.bind(localAddress, promise);
}
}
@Override
@ -164,41 +283,321 @@ public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O ext
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
outboundHandler.connect(ctx, remoteAddress, localAddress, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise);
} else {
outboundCtx.connect(localAddress, promise);
}
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
outboundHandler.disconnect(ctx, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.disconnect(outboundCtx, promise);
} else {
outboundCtx.disconnect(promise);
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
outboundHandler.close(ctx, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.close(outboundCtx, promise);
} else {
outboundCtx.close(promise);
}
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
outboundHandler.deregister(ctx, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.deregister(outboundCtx, promise);
} else {
outboundCtx.deregister(promise);
}
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
outboundHandler.read(ctx);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.read(outboundCtx);
} else {
outboundCtx.read();
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
outboundHandler.write(ctx, msg, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.write(outboundCtx, msg, promise);
} else {
outboundCtx.write(msg, promise);
}
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
outboundHandler.flush(ctx);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.flush(outboundCtx);
} else {
outboundCtx.flush();
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelWritabilityChanged(ctx);
private static class DelegatingChannelHandlerContext implements ChannelHandlerContext {
private final ChannelHandlerContext ctx;
private final ChannelHandler handler;
boolean removed;
DelegatingChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
this.ctx = ctx;
this.handler = handler;
}
@Override
public Channel channel() {
return ctx.channel();
}
@Override
public EventExecutor executor() {
return ctx.executor();
}
@Override
public String name() {
return ctx.name();
}
@Override
public ChannelHandler handler() {
return ctx.handler();
}
@Override
public boolean isRemoved() {
return removed || ctx.isRemoved();
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
ctx.fireChannelRegistered();
return this;
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
ctx.fireChannelUnregistered();
return this;
}
@Override
public ChannelHandlerContext fireChannelActive() {
ctx.fireChannelActive();
return this;
}
@Override
public ChannelHandlerContext fireChannelInactive() {
ctx.fireChannelInactive();
return this;
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
ctx.fireExceptionCaught(cause);
return this;
}
@Override
public ChannelHandlerContext fireUserEventTriggered(Object event) {
ctx.fireUserEventTriggered(event);
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
ctx.fireChannelRead(msg);
return this;
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
ctx.fireChannelReadComplete();
return this;
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
ctx.fireChannelWritabilityChanged();
return this;
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return ctx.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return ctx.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return ctx.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return ctx.disconnect();
}
@Override
public ChannelFuture close() {
return ctx.close();
}
@Override
public ChannelFuture deregister() {
return ctx.deregister();
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return ctx.bind(localAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return ctx.connect(remoteAddress, promise);
}
@Override
public ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
return ctx.disconnect(promise);
}
@Override
public ChannelFuture close(ChannelPromise promise) {
return ctx.close(promise);
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return ctx.deregister();
}
@Override
public ChannelHandlerContext read() {
ctx.read();
return this;
}
@Override
public ChannelFuture write(Object msg) {
return ctx.write(msg);
}
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return ctx.write(msg, promise);
}
@Override
public ChannelHandlerContext flush() {
ctx.flush();
return this;
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return ctx.writeAndFlush(msg, promise);
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return ctx.writeAndFlush(msg);
}
@Override
public ChannelPipeline pipeline() {
return ctx.pipeline();
}
@Override
public ByteBufAllocator alloc() {
return ctx.alloc();
}
@Override
public ChannelPromise newPromise() {
return ctx.newPromise();
}
@Override
public ChannelProgressivePromise newProgressivePromise() {
return ctx.newProgressivePromise();
}
@Override
public ChannelFuture newSucceededFuture() {
return ctx.newSucceededFuture();
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return ctx.newFailedFuture(cause);
}
@Override
public ChannelPromise voidPromise() {
return ctx.voidPromise();
}
@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
return ctx.attr(key);
}
final void remove() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
remove0();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
remove0();
}
});
}
}
private void remove0() {
if (!removed) {
removed = true;
try {
handler.handlerRemoved(this);
} catch (Throwable cause) {
fireExceptionCaught(new ChannelPipelineException(
handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
}
}
}
}
}

View File

@ -0,0 +1,324 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import static org.junit.Assert.*;
public class CombinedChannelDuplexHandlerTest {
private static final Object MSG = new Object();
private static final SocketAddress ADDRESS = new InetSocketAddress(0);
private enum Event {
REGISTERED,
UNREGISTERED,
ACTIVE,
INACTIVE,
CHANNEL_READ,
CHANNEL_READ_COMPLETE,
EXCEPTION_CAUGHT,
USER_EVENT_TRIGGERED,
CHANNEL_WRITABILITY_CHANGED,
HANDLER_ADDED,
HANDLER_REMOVED,
BIND,
CONNECT,
WRITE,
FLUSH,
READ,
REGISTER,
DEREGISTER,
CLOSE,
DISCONNECT
}
@Test(expected = IllegalStateException.class)
public void testInboundRemoveBeforeAdded() {
CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler> handler =
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
new ChannelInboundHandlerAdapter(), new ChannelOutboundHandlerAdapter());
handler.removeInboundHandler();
}
@Test(expected = IllegalStateException.class)
public void testOutboundRemoveBeforeAdded() {
CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler> handler =
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
new ChannelInboundHandlerAdapter(), new ChannelOutboundHandlerAdapter());
handler.removeOutboundHandler();
}
@Test(expected = IllegalArgumentException.class)
public void testInboundHandlerImplementsOutboundHandler() {
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
new ChannelDuplexHandler(), new ChannelOutboundHandlerAdapter());
}
@Test(expected = IllegalArgumentException.class)
public void testOutboundHandlerImplementsInbboundHandler() {
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
new ChannelInboundHandlerAdapter(), new ChannelDuplexHandler());
}
@Test(expected = IllegalStateException.class)
public void testInitNotCalledBeforeAdded() throws Exception {
CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler> handler =
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>() { };
handler.handlerAdded(null);
}
@Test
public void testExceptionCaughtBothCombinedHandlers() {
final Exception exception = new Exception();
final Queue<ChannelHandler> queue = new ArrayDeque<ChannelHandler>();
ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
assertSame(exception, cause);
queue.add(this);
ctx.fireExceptionCaught(cause);
}
};
ChannelOutboundHandler outboundHandler = new ChannelOutboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
assertSame(exception, cause);
queue.add(this);
ctx.fireExceptionCaught(cause);
}
};
ChannelInboundHandler lastHandler = new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
assertSame(exception, cause);
queue.add(this);
}
};
EmbeddedChannel channel = new EmbeddedChannel(
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
inboundHandler, outboundHandler), lastHandler);
channel.pipeline().fireExceptionCaught(exception);
assertFalse(channel.finish());
assertSame(inboundHandler, queue.poll());
assertSame(outboundHandler, queue.poll());
assertSame(lastHandler, queue.poll());
assertTrue(queue.isEmpty());
}
@Test
public void testInboundEvents() {
final Queue<Event> queue = new ArrayDeque<Event>();
ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.HANDLER_ADDED);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.HANDLER_REMOVED);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.REGISTERED);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.UNREGISTERED);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.ACTIVE);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.INACTIVE);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
queue.add(Event.CHANNEL_READ);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.CHANNEL_READ_COMPLETE);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
queue.add(Event.USER_EVENT_TRIGGERED);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.CHANNEL_WRITABILITY_CHANGED);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
queue.add(Event.EXCEPTION_CAUGHT);
}
};
CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler> handler =
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
inboundHandler, new ChannelOutboundHandlerAdapter());
EmbeddedChannel channel = new EmbeddedChannel(handler);
channel.pipeline().fireChannelWritabilityChanged();
channel.pipeline().fireUserEventTriggered(MSG);
channel.pipeline().fireChannelRead(MSG);
channel.pipeline().fireChannelReadComplete();
assertEquals(Event.HANDLER_ADDED, queue.poll());
assertEquals(Event.REGISTERED, queue.poll());
assertEquals(Event.ACTIVE, queue.poll());
assertEquals(Event.CHANNEL_WRITABILITY_CHANGED, queue.poll());
assertEquals(Event.USER_EVENT_TRIGGERED, queue.poll());
assertEquals(Event.CHANNEL_READ, queue.poll());
assertEquals(Event.CHANNEL_READ_COMPLETE, queue.poll());
handler.removeInboundHandler();
assertEquals(Event.HANDLER_REMOVED, queue.poll());
// These should not be handled by the inboundHandler anymore as it was removed before
channel.pipeline().fireChannelWritabilityChanged();
channel.pipeline().fireUserEventTriggered(MSG);
channel.pipeline().fireChannelRead(MSG);
channel.pipeline().fireChannelReadComplete();
// Should have not received any more events as it was removed before via removeInboundHandler()
assertTrue(queue.isEmpty());
assertTrue(channel.finish());
assertTrue(queue.isEmpty());
}
@Test
public void testOutboundEvents() {
final Queue<Event> queue = new ArrayDeque<Event>();
ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter();
ChannelOutboundHandler outboundHandler = new ChannelOutboundHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.HANDLER_ADDED);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.HANDLER_REMOVED);
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
queue.add(Event.BIND);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
queue.add(Event.CONNECT);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
queue.add(Event.DISCONNECT);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
queue.add(Event.CLOSE);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
queue.add(Event.DEREGISTER);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.READ);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
queue.add(Event.WRITE);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.FLUSH);
}
};
CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler> handler =
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
inboundHandler, outboundHandler);
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addFirst(handler);
doOutboundOperations(channel);
assertEquals(Event.HANDLER_ADDED, queue.poll());
assertEquals(Event.BIND, queue.poll());
assertEquals(Event.CONNECT, queue.poll());
assertEquals(Event.WRITE, queue.poll());
assertEquals(Event.FLUSH, queue.poll());
assertEquals(Event.READ, queue.poll());
assertEquals(Event.CLOSE, queue.poll());
assertEquals(Event.CLOSE, queue.poll());
assertEquals(Event.DEREGISTER, queue.poll());
handler.removeOutboundHandler();
assertEquals(Event.HANDLER_REMOVED, queue.poll());
// These should not be handled by the inboundHandler anymore as it was removed before
doOutboundOperations(channel);
// Should have not received any more events as it was removed before via removeInboundHandler()
assertTrue(queue.isEmpty());
assertTrue(channel.finish());
assertTrue(queue.isEmpty());
}
private static void doOutboundOperations(Channel channel) {
channel.pipeline().bind(ADDRESS);
channel.pipeline().connect(ADDRESS);
channel.pipeline().write(MSG);
channel.pipeline().flush();
channel.pipeline().read();
channel.pipeline().disconnect();
channel.pipeline().close();
channel.pipeline().deregister();
}
}