Let CombinedChannelDuplexHandler correctly handle exceptionCaught. Related to [#4528]

Motivation:

ChannelInboundHandler and ChannelOutboundHandler both can implement exceptionCaught(...) method and so we need to dispatch to both of them.

Modifications:

- Correctly first dispatch exceptionCaught to the ChannelInboundHandler but also make sure the next handler it will be dispatched to will be the ChannelOutboundHandler
- Add removeInboundHandler() and removeOutboundHandler() which allows to remove one of the combined handlers
- Let *Codec extends it and not ChannelHandlerAppender
- Remove ChannelHandlerAppender

Result:

Correctly handle events and also have same behavior as in 4.0
This commit is contained in:
Norman Maurer 2016-01-10 22:19:55 +01:00
parent 9e76b5319e
commit e969b6917c
11 changed files with 781 additions and 296 deletions

View File

@ -17,11 +17,10 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.handler.codec.PrematureChannelClosureException;
import io.netty.util.internal.OneTimeTask;
import java.util.ArrayDeque;
import java.util.List;
@ -42,7 +41,8 @@ import java.util.concurrent.atomic.AtomicLong;
*
* @see HttpServerCodec
*/
public final class HttpClientCodec extends ChannelHandlerAppender implements HttpClientUpgradeHandler.SourceCodec {
public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>
implements HttpClientUpgradeHandler.SourceCodec {
/** A queue that is used for correlating a request and a response. */
private final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>();
@ -83,8 +83,7 @@ public final class HttpClientCodec extends ChannelHandlerAppender implements Htt
public HttpClientCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
boolean validateHeaders) {
add(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders));
add(new Encoder());
init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), new Encoder());
this.failOnMissingResponse = failOnMissingResponse;
}
@ -95,36 +94,15 @@ public final class HttpClientCodec extends ChannelHandlerAppender implements Htt
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
final ChannelPipeline p = ctx.pipeline();
// Remove the decoder later so that the decoder can enter the 'UPGRADED' state and forward the remaining data.
ctx.executor().execute(new OneTimeTask() {
@Override
public void run() {
p.remove(decoder());
}
});
p.remove(encoder());
}
/**
* Returns the encoder of this codec.
*/
public HttpRequestEncoder encoder() {
return handlerAt(1);
}
/**
* Returns the decoder of this codec.
*/
public HttpResponseDecoder decoder() {
return handlerAt(0);
p.remove(this);
}
public void setSingleDecode(boolean singleDecode) {
decoder().setSingleDecode(singleDecode);
inboundHandler().setSingleDecode(singleDecode);
}
public boolean isSingleDecode() {
return decoder().isSingleDecode();
return inboundHandler().isSingleDecode();
}
private final class Encoder extends HttpRequestEncoder {

View File

@ -15,9 +15,8 @@
*/
package io.netty.handler.codec.http;
import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.CombinedChannelDuplexHandler;
/**
* A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder}
@ -25,8 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
*
* @see HttpClientCodec
*/
public final class HttpServerCodec extends ChannelHandlerAppender implements
HttpServerUpgradeHandler.SourceCodec {
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec {
/**
* Creates a new instance with the default decoder options
@ -58,21 +57,6 @@ public final class HttpServerCodec extends ChannelHandlerAppender implements
*/
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
ctx.pipeline().remove(HttpRequestDecoder.class);
ctx.pipeline().remove(HttpResponseEncoder.class);
}
/**
* Returns the encoder of this codec.
*/
public HttpResponseEncoder encoder() {
return handlerAt(1);
}
/**
* Returns the decoder of this codec.
*/
public HttpRequestDecoder decoder() {
return handlerAt(0);
ctx.pipeline().remove(this);
}
}

View File

@ -15,12 +15,12 @@
*/
package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.CombinedChannelDuplexHandler;
/**
* A combination of {@link SpdyHttpDecoder} and {@link SpdyHttpEncoder}
*/
public final class SpdyHttpCodec extends ChannelHandlerAppender {
public final class SpdyHttpCodec extends CombinedChannelDuplexHandler<SpdyHttpDecoder, SpdyHttpEncoder> {
/**
* Creates a new instance with the specified decoder options.
*/

View File

@ -16,8 +16,8 @@
package io.netty.handler.codec.memcache.binary;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.handler.codec.PrematureChannelClosureException;
import io.netty.handler.codec.memcache.LastMemcacheContent;
@ -35,7 +35,8 @@ import java.util.concurrent.atomic.AtomicLong;
* content, which defaults to 8192. This chunk size is the maximum, so if smaller chunks arrive they
* will be passed up the pipeline and not queued up to the chunk size.
*/
public final class BinaryMemcacheClientCodec extends ChannelHandlerAppender {
public final class BinaryMemcacheClientCodec extends
CombinedChannelDuplexHandler<BinaryMemcacheResponseDecoder, BinaryMemcacheRequestEncoder> {
private final boolean failOnMissingResponse;
private final AtomicLong requestResponseCounter = new AtomicLong();
@ -64,8 +65,7 @@ public final class BinaryMemcacheClientCodec extends ChannelHandlerAppender {
*/
public BinaryMemcacheClientCodec(int decodeChunkSize, boolean failOnMissingResponse) {
this.failOnMissingResponse = failOnMissingResponse;
add(new Decoder(decodeChunkSize));
add(new Encoder());
init(new Decoder(decodeChunkSize), new Encoder());
}
private final class Encoder extends BinaryMemcacheRequestEncoder {

View File

@ -15,7 +15,7 @@
*/
package io.netty.handler.codec.memcache.binary;
import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.CombinedChannelDuplexHandler;
/**
* The full server codec that combines the correct encoder and decoder.
@ -24,14 +24,14 @@ import io.netty.channel.ChannelHandlerAppender;
* Internally, it combines the {@link BinaryMemcacheRequestDecoder} and the
* {@link BinaryMemcacheResponseEncoder} to request decoding and response encoding.
*/
public class BinaryMemcacheServerCodec extends ChannelHandlerAppender {
public class BinaryMemcacheServerCodec extends
CombinedChannelDuplexHandler<BinaryMemcacheRequestDecoder, BinaryMemcacheResponseEncoder> {
public BinaryMemcacheServerCodec() {
this(AbstractBinaryMemcacheDecoder.DEFAULT_MAX_CHUNK_SIZE);
}
public BinaryMemcacheServerCodec(int decodeChunkSize) {
add(new BinaryMemcacheRequestDecoder(decodeChunkSize));
add(new BinaryMemcacheResponseEncoder());
super(new BinaryMemcacheRequestDecoder(decodeChunkSize), new BinaryMemcacheResponseEncoder());
}
}

View File

@ -101,12 +101,12 @@ public final class HttpProxyHandler extends ProxyHandler {
@Override
protected void removeEncoder(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(codec.encoder());
codec.removeOutboundHandler();
}
@Override
protected void removeDecoder(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(codec.decoder());
codec.removeInboundHandler();
}
@Override

View File

@ -83,7 +83,7 @@ final class HttpProxyServer extends ProxyServer {
}
ctx.pipeline().remove(HttpObjectAggregator.class);
ctx.pipeline().remove(HttpRequestDecoder.class);
ctx.pipeline().get(HttpServerCodec.class).removeInboundHandler();
boolean authzSuccess = false;
if (username != null) {
@ -128,7 +128,7 @@ final class HttpProxyServer extends ProxyServer {
}
ctx.write(res);
ctx.pipeline().remove(HttpResponseEncoder.class);
ctx.pipeline().get(HttpServerCodec.class).removeOutboundHandler();
return true;
}
@ -158,7 +158,7 @@ final class HttpProxyServer extends ProxyServer {
}
ctx.write(res);
ctx.pipeline().remove(HttpResponseEncoder.class);
ctx.pipeline().get(HttpServerCodec.class).removeOutboundHandler();
if (sendGreeting) {
ctx.write(Unpooled.copiedBuffer("0\n", CharsetUtil.US_ASCII));

View File

@ -1,205 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.ArrayList;
import java.util.List;
/**
* A {@link ChannelHandler} that appends the specified {@link ChannelHandler}s right next to itself.
* By default, it removes itself from the {@link ChannelPipeline} once the specified {@link ChannelHandler}s
* are added. Optionally, you can keep it in the {@link ChannelPipeline} by specifying a {@code boolean}
* parameter at construction time.
*/
public class ChannelHandlerAppender extends ChannelInboundHandlerAdapter {
private static final class Entry {
final String name;
final ChannelHandler handler;
Entry(String name, ChannelHandler handler) {
this.name = name;
this.handler = handler;
}
}
private final boolean selfRemoval;
private final List<Entry> handlers = new ArrayList<Entry>();
private boolean added;
/**
* Creates a new uninitialized instance. A class that extends this handler must invoke
* {@link #add(ChannelHandler...)} before adding this handler into a {@link ChannelPipeline}.
*/
protected ChannelHandlerAppender() {
this(true);
}
/**
* Creates a new uninitialized instance. A class that extends this handler must invoke
* {@link #add(ChannelHandler...)} before adding this handler into a {@link ChannelPipeline}.
*
* @param selfRemoval {@code true} to remove itself from the {@link ChannelPipeline} after appending
* the {@link ChannelHandler}s specified via {@link #add(ChannelHandler...)}.
*/
protected ChannelHandlerAppender(boolean selfRemoval) {
this.selfRemoval = selfRemoval;
}
/**
* Creates a new instance that appends the specified {@link ChannelHandler}s right next to itself.
*/
public ChannelHandlerAppender(Iterable<? extends ChannelHandler> handlers) {
this(true, handlers);
}
/**
* Creates a new instance that appends the specified {@link ChannelHandler}s right next to itself.
*/
public ChannelHandlerAppender(ChannelHandler... handlers) {
this(true, handlers);
}
/**
* Creates a new instance that appends the specified {@link ChannelHandler}s right next to itself.
*
* @param selfRemoval {@code true} to remove itself from the {@link ChannelPipeline} after appending
* the specified {@link ChannelHandler}s
*/
public ChannelHandlerAppender(boolean selfRemoval, Iterable<? extends ChannelHandler> handlers) {
this.selfRemoval = selfRemoval;
add(handlers);
}
/**
* Creates a new instance that appends the specified {@link ChannelHandler}s right next to itself.
*
* @param selfRemoval {@code true} to remove itself from the {@link ChannelPipeline} after appending
* the specified {@link ChannelHandler}s
*/
public ChannelHandlerAppender(boolean selfRemoval, ChannelHandler... handlers) {
this.selfRemoval = selfRemoval;
add(handlers);
}
/**
* Adds the specified handler to the list of the appended handlers.
*
* @param name the name of the appended handler. {@code null} to auto-generate
* @param handler the handler to append
*
* @throws IllegalStateException if {@link ChannelHandlerAppender} has been added to the pipeline already
*/
protected final ChannelHandlerAppender add(String name, ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
if (added) {
throw new IllegalStateException("added to the pipeline already");
}
handlers.add(new Entry(name, handler));
return this;
}
/**
* Adds the specified handler to the list of the appended handlers with the auto-generated handler name.
*
* @param handler the handler to append
*
* @throws IllegalStateException if {@link ChannelHandlerAppender} has been added to the pipeline already
*/
protected final ChannelHandlerAppender add(ChannelHandler handler) {
return add(null, handler);
}
/**
* Adds the specified handlers to the list of the appended handlers. The handlers' names are auto-generated.
*
* @throws IllegalStateException if {@link ChannelHandlerAppender} has been added to the pipeline already
*/
protected final ChannelHandlerAppender add(Iterable<? extends ChannelHandler> handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
add(h);
}
return this;
}
/**
* Adds the specified handlers to the list of the appended handlers. The handlers' names are auto-generated.
*
* @throws IllegalStateException if {@link ChannelHandlerAppender} has been added to the pipeline already
*/
protected final ChannelHandlerAppender add(ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
add(h);
}
return this;
}
/**
* Returns the {@code index}-th appended handler.
*/
@SuppressWarnings("unchecked")
protected final <T extends ChannelHandler> T handlerAt(int index) {
return (T) handlers.get(index).handler;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
added = true;
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
DefaultChannelPipeline pipeline = (DefaultChannelPipeline) dctx.pipeline();
String name = dctx.name();
try {
for (Entry e: handlers) {
String oldName = name;
if (e.name == null) {
name = pipeline.generateName(e.handler);
} else {
name = e.name;
}
// Note that we do not use dctx.invoker() because it raises an IllegalStateExxception
// if the Channel is not registered yet.
pipeline.addAfter(dctx.invoker, oldName, name, e.handler);
}
} finally {
if (selfRemoval) {
pipeline.remove(this);
}
}
}
}

View File

@ -15,15 +15,28 @@
*/
package io.netty.channel;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.SocketAddress;
/**
* @deprecated Use {@link ChannelHandlerAppender} instead.
* Combines a {@link ChannelInboundHandler} and a {@link ChannelOutboundHandler} into one {@link ChannelHandler}.
*/
@Deprecated
public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
extends ChannelDuplexHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class);
private DelegatingChannelHandlerContext inboundCtx;
private DelegatingChannelHandlerContext outboundCtx;
private volatile boolean handlerAdded;
private I inboundHandler;
private O outboundHandler;
@ -88,6 +101,28 @@ public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O ext
return outboundHandler;
}
private void checkAdded() {
if (!handlerAdded) {
throw new IllegalStateException("handler not added to pipeline yet");
}
}
/**
* Removes the {@link ChannelInboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
*/
public final void removeInboundHandler() {
checkAdded();
inboundCtx.remove();
}
/**
* Removes the {@link ChannelOutboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
*/
public final void removeOutboundHandler() {
checkAdded();
outboundCtx.remove();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (inboundHandler == null) {
@ -96,67 +131,151 @@ public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O ext
" if " + CombinedChannelDuplexHandler.class.getSimpleName() +
" was constructed with the default constructor.");
}
outboundCtx = new DelegatingChannelHandlerContext(ctx, outboundHandler);
inboundCtx = new DelegatingChannelHandlerContext(ctx, inboundHandler) {
@SuppressWarnings("deprecation")
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
if (!outboundCtx.removed) {
try {
inboundHandler.handlerAdded(ctx);
// We directly delegate to the ChannelOutboundHandler as this may override exceptionCaught(...)
// as well
outboundHandler.exceptionCaught(outboundCtx, cause);
} catch (Throwable error) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", error);
}
}
} else {
super.fireExceptionCaught(cause);
}
return this;
}
};
// The inboundCtx and outboundCtx were created and set now it's safe to call removeInboundHandler() and
// removeOutboundHandler().
handlerAdded = true;
try {
inboundHandler.handlerAdded(inboundCtx);
} finally {
outboundHandler.handlerAdded(ctx);
outboundHandler.handlerAdded(outboundCtx);
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
try {
inboundHandler.handlerRemoved(ctx);
inboundCtx.remove();
} finally {
outboundHandler.handlerRemoved(ctx);
outboundCtx.remove();
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelRegistered(ctx);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelRegistered(inboundCtx);
} else {
inboundCtx.fireChannelRegistered();
}
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelUnregistered(ctx);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelUnregistered(inboundCtx);
} else {
inboundCtx.fireChannelUnregistered();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelActive(ctx);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelActive(inboundCtx);
} else {
inboundCtx.fireChannelActive();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelInactive(ctx);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelInactive(inboundCtx);
} else {
inboundCtx.fireChannelInactive();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
inboundHandler.exceptionCaught(ctx, cause);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.exceptionCaught(inboundCtx, cause);
} else {
inboundCtx.fireExceptionCaught(cause);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
inboundHandler.userEventTriggered(ctx, evt);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.userEventTriggered(inboundCtx, evt);
} else {
inboundCtx.fireUserEventTriggered(evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundHandler.channelRead(ctx, msg);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelRead(inboundCtx, msg);
} else {
inboundCtx.fireChannelRead(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelReadComplete(ctx);
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelReadComplete(inboundCtx);
} else {
inboundCtx.fireChannelReadComplete();
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
assert ctx == inboundCtx.ctx;
if (!inboundCtx.removed) {
inboundHandler.channelWritabilityChanged(inboundCtx);
} else {
inboundCtx.fireChannelWritabilityChanged();
}
}
@Override
public void bind(
ChannelHandlerContext ctx,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
outboundHandler.bind(ctx, localAddress, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.bind(outboundCtx, localAddress, promise);
} else {
outboundCtx.bind(localAddress, promise);
}
}
@Override
@ -164,41 +283,326 @@ public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O ext
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
outboundHandler.connect(ctx, remoteAddress, localAddress, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise);
} else {
outboundCtx.connect(localAddress, promise);
}
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
outboundHandler.disconnect(ctx, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.disconnect(outboundCtx, promise);
} else {
outboundCtx.disconnect(promise);
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
outboundHandler.close(ctx, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.close(outboundCtx, promise);
} else {
outboundCtx.close(promise);
}
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
outboundHandler.deregister(ctx, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.deregister(outboundCtx, promise);
} else {
outboundCtx.deregister(promise);
}
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
outboundHandler.read(ctx);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.read(outboundCtx);
} else {
outboundCtx.read();
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
outboundHandler.write(ctx, msg, promise);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.write(outboundCtx, msg, promise);
} else {
outboundCtx.write(msg, promise);
}
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
outboundHandler.flush(ctx);
assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) {
outboundHandler.flush(outboundCtx);
} else {
outboundCtx.flush();
}
}
private static class DelegatingChannelHandlerContext implements ChannelHandlerContext {
private final ChannelHandlerContext ctx;
private final ChannelHandler handler;
boolean removed;
DelegatingChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
this.ctx = ctx;
this.handler = handler;
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
inboundHandler.channelWritabilityChanged(ctx);
public Channel channel() {
return ctx.channel();
}
@Override
public EventExecutor executor() {
return ctx.executor();
}
@Override
public String name() {
return ctx.name();
}
@Override
public ChannelHandler handler() {
return ctx.handler();
}
@Override
public boolean isRemoved() {
return removed || ctx.isRemoved();
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
ctx.fireChannelRegistered();
return this;
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
ctx.fireChannelUnregistered();
return this;
}
@Override
public ChannelHandlerContext fireChannelActive() {
ctx.fireChannelActive();
return this;
}
@Override
public ChannelHandlerContext fireChannelInactive() {
ctx.fireChannelInactive();
return this;
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
ctx.fireExceptionCaught(cause);
return this;
}
@Override
public ChannelHandlerContext fireUserEventTriggered(Object event) {
ctx.fireUserEventTriggered(event);
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
ctx.fireChannelRead(msg);
return this;
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
ctx.fireChannelReadComplete();
return this;
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
ctx.fireChannelWritabilityChanged();
return this;
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return ctx.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return ctx.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return ctx.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return ctx.disconnect();
}
@Override
public ChannelFuture close() {
return ctx.close();
}
@Override
public ChannelFuture deregister() {
return ctx.deregister();
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return ctx.bind(localAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return ctx.connect(remoteAddress, promise);
}
@Override
public ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
return ctx.disconnect(promise);
}
@Override
public ChannelFuture close(ChannelPromise promise) {
return ctx.close(promise);
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return ctx.deregister();
}
@Override
public ChannelHandlerContext read() {
ctx.read();
return this;
}
@Override
public ChannelFuture write(Object msg) {
return ctx.write(msg);
}
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return ctx.write(msg, promise);
}
@Override
public ChannelHandlerContext flush() {
ctx.flush();
return this;
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return ctx.writeAndFlush(msg, promise);
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return ctx.writeAndFlush(msg);
}
@Override
public ChannelPipeline pipeline() {
return ctx.pipeline();
}
@Override
public ByteBufAllocator alloc() {
return ctx.alloc();
}
@Override
public ChannelPromise newPromise() {
return ctx.newPromise();
}
@Override
public ChannelProgressivePromise newProgressivePromise() {
return ctx.newProgressivePromise();
}
@Override
public ChannelFuture newSucceededFuture() {
return ctx.newSucceededFuture();
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return ctx.newFailedFuture(cause);
}
@Override
public ChannelPromise voidPromise() {
return ctx.voidPromise();
}
@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
return ctx.attr(key);
}
@Override
public <T> boolean hasAttr(AttributeKey<T> key) {
return ctx.hasAttr(key);
}
final void remove() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
remove0();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
remove0();
}
});
}
}
private void remove0() {
if (!removed) {
removed = true;
try {
handler.handlerRemoved(this);
} catch (Throwable cause) {
fireExceptionCaught(new ChannelPipelineException(
handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
}
}
}
}
}

View File

@ -337,7 +337,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return invoker;
}
String generateName(ChannelHandler handler) {
private String generateName(ChannelHandler handler) {
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
@ -416,7 +416,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return context;
}
void remove0(AbstractChannelHandlerContext ctx) {
private void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;

View File

@ -0,0 +1,324 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import static org.junit.Assert.*;
public class CombinedChannelDuplexHandlerTest {
private static final Object MSG = new Object();
private static final SocketAddress ADDRESS = new InetSocketAddress(0);
private enum Event {
REGISTERED,
UNREGISTERED,
ACTIVE,
INACTIVE,
CHANNEL_READ,
CHANNEL_READ_COMPLETE,
EXCEPTION_CAUGHT,
USER_EVENT_TRIGGERED,
CHANNEL_WRITABILITY_CHANGED,
HANDLER_ADDED,
HANDLER_REMOVED,
BIND,
CONNECT,
WRITE,
FLUSH,
READ,
REGISTER,
DEREGISTER,
CLOSE,
DISCONNECT
}
@Test(expected = IllegalStateException.class)
public void testInboundRemoveBeforeAdded() {
CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler> handler =
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
new ChannelInboundHandlerAdapter(), new ChannelOutboundHandlerAdapter());
handler.removeInboundHandler();
}
@Test(expected = IllegalStateException.class)
public void testOutboundRemoveBeforeAdded() {
CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler> handler =
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
new ChannelInboundHandlerAdapter(), new ChannelOutboundHandlerAdapter());
handler.removeOutboundHandler();
}
@Test(expected = IllegalArgumentException.class)
public void testInboundHandlerImplementsOutboundHandler() {
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
new ChannelDuplexHandler(), new ChannelOutboundHandlerAdapter());
}
@Test(expected = IllegalArgumentException.class)
public void testOutboundHandlerImplementsInbboundHandler() {
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
new ChannelInboundHandlerAdapter(), new ChannelDuplexHandler());
}
@Test(expected = IllegalStateException.class)
public void testInitNotCalledBeforeAdded() throws Exception {
CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler> handler =
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>() { };
handler.handlerAdded(null);
}
@Test
public void testExceptionCaughtBothCombinedHandlers() {
final Exception exception = new Exception();
final Queue<ChannelHandler> queue = new ArrayDeque<ChannelHandler>();
ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
assertSame(exception, cause);
queue.add(this);
ctx.fireExceptionCaught(cause);
}
};
ChannelOutboundHandler outboundHandler = new ChannelOutboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
assertSame(exception, cause);
queue.add(this);
ctx.fireExceptionCaught(cause);
}
};
ChannelInboundHandler lastHandler = new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
assertSame(exception, cause);
queue.add(this);
}
};
EmbeddedChannel channel = new EmbeddedChannel(
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
inboundHandler, outboundHandler), lastHandler);
channel.pipeline().fireExceptionCaught(exception);
assertFalse(channel.finish());
assertSame(inboundHandler, queue.poll());
assertSame(outboundHandler, queue.poll());
assertSame(lastHandler, queue.poll());
assertTrue(queue.isEmpty());
}
@Test
public void testInboundEvents() {
final Queue<Event> queue = new ArrayDeque<Event>();
ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.HANDLER_ADDED);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.HANDLER_REMOVED);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.REGISTERED);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.UNREGISTERED);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.ACTIVE);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.INACTIVE);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
queue.add(Event.CHANNEL_READ);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.CHANNEL_READ_COMPLETE);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
queue.add(Event.USER_EVENT_TRIGGERED);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.CHANNEL_WRITABILITY_CHANGED);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
queue.add(Event.EXCEPTION_CAUGHT);
}
};
CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler> handler =
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
inboundHandler, new ChannelOutboundHandlerAdapter());
EmbeddedChannel channel = new EmbeddedChannel(handler);
channel.pipeline().fireChannelWritabilityChanged();
channel.pipeline().fireUserEventTriggered(MSG);
channel.pipeline().fireChannelRead(MSG);
channel.pipeline().fireChannelReadComplete();
assertEquals(Event.HANDLER_ADDED, queue.poll());
assertEquals(Event.REGISTERED, queue.poll());
assertEquals(Event.ACTIVE, queue.poll());
assertEquals(Event.CHANNEL_WRITABILITY_CHANGED, queue.poll());
assertEquals(Event.USER_EVENT_TRIGGERED, queue.poll());
assertEquals(Event.CHANNEL_READ, queue.poll());
assertEquals(Event.CHANNEL_READ_COMPLETE, queue.poll());
handler.removeInboundHandler();
assertEquals(Event.HANDLER_REMOVED, queue.poll());
// These should not be handled by the inboundHandler anymore as it was removed before
channel.pipeline().fireChannelWritabilityChanged();
channel.pipeline().fireUserEventTriggered(MSG);
channel.pipeline().fireChannelRead(MSG);
channel.pipeline().fireChannelReadComplete();
// Should have not received any more events as it was removed before via removeInboundHandler()
assertTrue(queue.isEmpty());
assertTrue(channel.finish());
assertTrue(queue.isEmpty());
}
@Test
public void testOutboundEvents() {
final Queue<Event> queue = new ArrayDeque<Event>();
ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter();
ChannelOutboundHandler outboundHandler = new ChannelOutboundHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.HANDLER_ADDED);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.HANDLER_REMOVED);
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
queue.add(Event.BIND);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
queue.add(Event.CONNECT);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
queue.add(Event.DISCONNECT);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
queue.add(Event.CLOSE);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
queue.add(Event.DEREGISTER);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.READ);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
queue.add(Event.WRITE);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
queue.add(Event.FLUSH);
}
};
CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler> handler =
new CombinedChannelDuplexHandler<ChannelInboundHandler, ChannelOutboundHandler>(
inboundHandler, outboundHandler);
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addFirst(handler);
doOutboundOperations(channel);
assertEquals(Event.HANDLER_ADDED, queue.poll());
assertEquals(Event.BIND, queue.poll());
assertEquals(Event.CONNECT, queue.poll());
assertEquals(Event.WRITE, queue.poll());
assertEquals(Event.FLUSH, queue.poll());
assertEquals(Event.READ, queue.poll());
assertEquals(Event.CLOSE, queue.poll());
assertEquals(Event.CLOSE, queue.poll());
assertEquals(Event.DEREGISTER, queue.poll());
handler.removeOutboundHandler();
assertEquals(Event.HANDLER_REMOVED, queue.poll());
// These should not be handled by the inboundHandler anymore as it was removed before
doOutboundOperations(channel);
// Should have not received any more events as it was removed before via removeInboundHandler()
assertTrue(queue.isEmpty());
assertTrue(channel.finish());
assertTrue(queue.isEmpty());
}
private static void doOutboundOperations(Channel channel) {
channel.pipeline().bind(ADDRESS);
channel.pipeline().connect(ADDRESS);
channel.pipeline().write(MSG);
channel.pipeline().flush();
channel.pipeline().read();
channel.pipeline().disconnect();
channel.pipeline().close();
channel.pipeline().deregister();
}
}