[#1236] Fix problem where adding a new ChannelHandler could block the eventloop

This change also introduce a few other changes which was needed:
 * ChannelHandler.beforeAdd(...) and ChannelHandler.beforeRemove(...) were removed
 * ChannelHandler.afterAdd(...) -> handlerAdded(...)
 * ChannelHandler.afterRemoved(...) -> handlerRemoved(...)
 * SslHandler.handshake() -> SslHandler.hanshakeFuture() as the handshake is triggered automatically after
   the Channel becomes active
This commit is contained in:
Norman Maurer 2013-04-05 15:46:18 +02:00
parent 4a5dc32224
commit ca5554dfe7
26 changed files with 410 additions and 485 deletions

View File

@ -202,9 +202,9 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
} }
@Override @Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
cleanup(); cleanup();
super.afterRemove(ctx); super.handlerRemoved(ctx);
} }
@Override @Override

View File

@ -228,9 +228,9 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpReque
protected abstract Result beginEncode(HttpResponse headers, String acceptEncoding) throws Exception; protected abstract Result beginEncode(HttpResponse headers, String acceptEncoding) throws Exception;
@Override @Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
cleanup(); cleanup();
super.afterRemove(ctx); super.handlerRemoved(ctx);
} }
@Override @Override

View File

@ -203,7 +203,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx; this.ctx = ctx;
} }
} }

View File

@ -137,7 +137,7 @@ public class WebSocketClientProtocolHandler extends WebSocketProtocolHandler {
} }
@Override @Override
public void afterAdd(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline cp = ctx.pipeline(); ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketClientProtocolHandshakeHandler.class) == null) { if (cp.get(WebSocketClientProtocolHandshakeHandler.class) == null) {
// Add the WebSocketClientProtocolHandshakeHandler before this one. // Add the WebSocketClientProtocolHandshakeHandler before this one.

View File

@ -80,7 +80,7 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {
} }
@Override @Override
public void afterAdd(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline cp = ctx.pipeline(); ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) { if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
// Add the WebSocketHandshakeHandler before this one. // Add the WebSocketHandshakeHandler before this one.

View File

@ -60,7 +60,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<SpdyDataOrControlFram
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.channel().closeFuture().addListener(new ChannelFutureListener() { ctx.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {

View File

@ -132,7 +132,7 @@ public class HttpObjecctAggregatorTest {
HttpObjectAggregator aggr = new HttpObjectAggregator(Integer.MAX_VALUE); HttpObjectAggregator aggr = new HttpObjectAggregator(Integer.MAX_VALUE);
ChannelHandlerContext ctx = EasyMock.createMock(ChannelHandlerContext.class); ChannelHandlerContext ctx = EasyMock.createMock(ChannelHandlerContext.class);
EasyMock.replay(ctx); EasyMock.replay(ctx);
aggr.beforeAdd(ctx); aggr.handlerAdded(ctx);
aggr.setMaxCumulationBufferComponents(10); aggr.setMaxCumulationBufferComponents(10);
} }
} }

View File

@ -60,11 +60,6 @@ public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler
outboundMsgMatcher = TypeParameterMatcher.get(outboundMessageType); outboundMsgMatcher = TypeParameterMatcher.get(outboundMessageType);
} }
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
decoder.beforeAdd(ctx);
}
@Override @Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return decoder.newInboundBuffer(ctx); return decoder.newInboundBuffer(ctx);

View File

@ -404,7 +404,7 @@ public class JZlibEncoder extends ZlibEncoder {
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx; this.ctx = ctx;
} }
} }

View File

@ -262,7 +262,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx; this.ctx = ctx;
} }
} }

View File

@ -64,7 +64,7 @@ public class WebSocketClientHandler extends ChannelInboundMessageHandlerAdapter<
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handshakeFuture = ctx.newPromise(); handshakeFuture = ctx.newPromise();
} }

View File

@ -16,13 +16,13 @@
package io.netty.example.securechat; package io.netty.example.securechat;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.logging.Level; import java.util.logging.Level;
@ -42,9 +42,10 @@ public class SecureChatServerHandler extends ChannelInboundMessageHandlerAdapter
public void channelActive(final ChannelHandlerContext ctx) throws Exception { public void channelActive(final ChannelHandlerContext ctx) throws Exception {
// Once session is secured, send a greeting and register the channel to the global channel // Once session is secured, send a greeting and register the channel to the global channel
// list so the channel received the messages from others. // list so the channel received the messages from others.
ctx.pipeline().get(SslHandler.class).handshake().addListener(new ChannelFutureListener() { ctx.pipeline().get(SslHandler.class).handshakeFuture().addListener(
new GenericFutureListener<Future<Channel>>() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(Future<Channel> future) throws Exception {
ctx.write( ctx.write(
"Welcome to " + InetAddress.getLocalHost().getHostName() + "Welcome to " + InetAddress.getLocalHost().getHostName() +
" secure chat service!\n"); " secure chat service!\n");

View File

@ -29,8 +29,11 @@ import io.netty.channel.ChannelOutboundByteHandler;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelStateHandler; import io.netty.channel.ChannelStateHandler;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ImmediateExecutor; import io.netty.util.concurrent.ImmediateExecutor;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -47,8 +50,6 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -62,10 +63,9 @@ import java.util.regex.Pattern;
* *
* <h3>Beginning the handshake</h3> * <h3>Beginning the handshake</h3>
* <p> * <p>
* You must make sure not to write a message while the * You must make sure not to write a message while the handshake is in progress unless you are
* {@linkplain #handshake() handshake} is in progress unless you are * renegotiating. You will be notified by the {@link Future} which is
* renegotiating. You will be notified by the {@link ChannelFuture} which is * returned by the {@link #handshakeFuture()} method when the handshake
* returned by the {@link #handshake()} method when the handshake
* process succeeds or fails. * process succeeds or fails.
* <p> * <p>
* Beside using the handshake {@link ChannelFuture} to get notified about the completation of the handshake it's * Beside using the handshake {@link ChannelFuture} to get notified about the completation of the handshake it's
@ -128,7 +128,7 @@ import java.util.regex.Pattern;
* <li>create a new {@link SslHandler} instance with {@code startTls} flag set * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
* to {@code false},</li> * to {@code false},</li>
* <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li> * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
* <li>Initiate SSL handshake by calling {@link SslHandler#handshake()}.</li> * <li>Initiate SSL handshake.</li>
* </ol> * </ol>
* *
* <h3>Known issues</h3> * <h3>Known issues</h3>
@ -172,8 +172,8 @@ public class SslHandler
private boolean sentFirstMessage; private boolean sentFirstMessage;
private WritableByteChannel bufferChannel; private WritableByteChannel bufferChannel;
private final Queue<ChannelPromise> handshakePromises = new ArrayDeque<ChannelPromise>(); private final LazyChannelPromise handshakePromise = new LazyChannelPromise();
private final SSLEngineInboundCloseFuture sslCloseFuture = new SSLEngineInboundCloseFuture(); private final LazyChannelPromise sslCloseFuture = new LazyChannelPromise();
private final CloseNotifyListener closeNotifyWriteListener = new CloseNotifyListener(); private final CloseNotifyListener closeNotifyWriteListener = new CloseNotifyListener();
private volatile long handshakeTimeoutMillis = 10000; private volatile long handshakeTimeoutMillis = 10000;
@ -286,68 +286,10 @@ public class SslHandler
} }
/** /**
* Starts the SSL / TLS handshake and returns a {@link ChannelFuture} that will * Returns a {@link Future} that will get notified once the handshake completes.
* get notified once the handshake completes.
*/ */
public ChannelFuture handshake() { public Future<Channel> handshakeFuture() {
return handshake(ctx.newPromise()); return handshakePromise;
}
/**
* Starts an SSL / TLS handshake for the specified channel.
*
* @return a {@link ChannelPromise} which is notified when the handshake
* succeeds or fails.
*/
public ChannelFuture handshake(final ChannelPromise promise) {
final ChannelHandlerContext ctx = this.ctx;
final ScheduledFuture<?> timeoutFuture;
if (handshakeTimeoutMillis > 0) {
timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (promise.isDone()) {
return;
}
SSLException e = new SSLException("handshake timed out");
if (promise.tryFailure(e)) {
ctx.fireExceptionCaught(e);
ctx.close();
}
}
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
} else {
timeoutFuture = null;
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
}
});
ctx.executor().execute(new Runnable() {
@Override
public void run() {
try {
engine.beginHandshake();
handshakePromises.add(promise);
flush0(ctx, ctx.newPromise(), true);
} catch (Exception e) {
if (promise.tryFailure(e)) {
ctx.fireExceptionCaught(e);
ctx.close();
}
}
}
});
return promise;
} }
/** /**
@ -390,7 +332,7 @@ public class SslHandler
* For more informations see the apidocs of {@link SSLEngine} * For more informations see the apidocs of {@link SSLEngine}
* *
*/ */
public ChannelFuture sslCloseFuture() { public Future<Channel> sslCloseFuture() {
return sslCloseFuture; return sslCloseFuture;
} }
@ -882,7 +824,7 @@ public class SslHandler
switch (result.getStatus()) { switch (result.getStatus()) {
case CLOSED: case CLOSED:
// notify about the CLOSED state of the SSLEngine. See #137 // notify about the CLOSED state of the SSLEngine. See #137
sslCloseFuture.setClosed(); sslCloseFuture.trySuccess(ctx.channel());
break; break;
case BUFFER_UNDERFLOW: case BUFFER_UNDERFLOW:
break loop; break loop;
@ -966,15 +908,7 @@ public class SslHandler
* Notify all the handshake futures about the successfully handshake * Notify all the handshake futures about the successfully handshake
*/ */
private void setHandshakeSuccess() { private void setHandshakeSuccess() {
try { if (handshakePromise.trySuccess(ctx.channel())) {
for (;;) {
ChannelPromise p = handshakePromises.poll();
if (p == null) {
break;
}
p.setSuccess();
}
} finally {
ctx.fireUserEventTriggered(HANDSHAKE_SUCCESS_EVENT); ctx.fireUserEventTriggered(HANDSHAKE_SUCCESS_EVENT);
} }
} }
@ -983,39 +917,25 @@ public class SslHandler
* Notify all the handshake futures about the failure during the handshake. * Notify all the handshake futures about the failure during the handshake.
*/ */
private void setHandshakeFailure(Throwable cause) { private void setHandshakeFailure(Throwable cause) {
// Release all resources such as internal buffers that SSLEngine
// is managing.
engine.closeOutbound();
final boolean disconnected = cause == null || cause instanceof ClosedChannelException;
try { try {
// Release all resources such as internal buffers that SSLEngine engine.closeInbound();
// is managing. } catch (SSLException e) {
engine.closeOutbound(); if (!disconnected) {
logger.warn("SSLEngine.closeInbound() raised an exception after a handshake failure.", e);
final boolean disconnected = cause == null || cause instanceof ClosedChannelException; } else if (!closeNotifyWriteListener.done) {
try { logger.warn("SSLEngine.closeInbound() raised an exception due to closed connection.", e);
engine.closeInbound(); } else {
} catch (SSLException e) { // cause == null && sentCloseNotify
if (!disconnected) { // closeInbound() will raise an exception with bogus truncation attack warning.
logger.warn("SSLEngine.closeInbound() raised an exception after a handshake failure.", e);
} else if (!closeNotifyWriteListener.done) {
logger.warn("SSLEngine.closeInbound() raised an exception due to closed connection.", e);
} else {
// cause == null && sentCloseNotify
// closeInbound() will raise an exception with bogus truncation attack warning.
}
} }
}
if (!handshakePromises.isEmpty()) { if (handshakePromise.tryFailure(cause)) {
if (cause == null) {
cause = new ClosedChannelException();
}
for (;;) {
ChannelPromise p = handshakePromises.poll();
if (p == null) {
break;
}
p.setFailure(cause);
}
}
} finally {
ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause)); ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
} }
flush0(ctx, 0, cause); flush0(ctx, 0, cause);
@ -1040,33 +960,71 @@ public class SslHandler
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx; this.ctx = ctx;
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive()) { if (ctx.channel().isActive()) {
// channelActvie() event has been fired already, which means this.channelActive() will // channelActvie() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead. // not be invoked. We have to initialize here instead.
handshake(); handshake0();
} else { } else {
// channelActive() event has not been fired yet. this.channelOpen() will be invoked // channelActive() event has not been fired yet. this.channelOpen() will be invoked
// and initialization will occur there. // and initialization will occur there.
} }
} }
private Future<Channel> handshake0() {
final ScheduledFuture<?> timeoutFuture;
if (handshakeTimeoutMillis > 0) {
timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (handshakePromise.isDone()) {
return;
}
SSLException e = new SSLException("handshake timed out");
if (handshakePromise.tryFailure(e)) {
ctx.fireExceptionCaught(e);
ctx.close();
}
}
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
} else {
timeoutFuture = null;
}
handshakePromise.addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> f) throws Exception {
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
}
}
});
try {
engine.beginHandshake();
flush0(ctx, ctx.newPromise(), true);
} catch (Exception e) {
if (handshakePromise.tryFailure(e)) {
ctx.fireExceptionCaught(e);
ctx.close();
}
}
return handshakePromise;
}
/** /**
* Calls {@link #handshake()} once the {@link Channel} is connected * Issues a SSL handshake once connected when used in client-mode
*/ */
@Override @Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception { public void channelActive(final ChannelHandlerContext ctx) throws Exception {
if (!startTls && engine.getUseClientMode()) { if (!startTls && engine.getUseClientMode()) {
// issue and handshake and add a listener to it which will fire an exception event if // issue and handshake and add a listener to it which will fire an exception event if
// an exception was thrown while doing the handshake // an exception was thrown while doing the handshake
handshake().addListener(new ChannelFutureListener() { handshake0().addListener(new GenericFutureListener<Future<Channel>>() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(Future<Channel> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
ctx.pipeline().fireExceptionCaught(future.cause()); ctx.pipeline().fireExceptionCaught(future.cause());
ctx.close(); ctx.close();
@ -1074,7 +1032,6 @@ public class SslHandler
} }
}); });
} }
ctx.fireChannelActive(); ctx.fireChannelActive();
} }
@ -1131,43 +1088,14 @@ public class SslHandler
} }
} }
private final class SSLEngineInboundCloseFuture extends DefaultChannelPromise { private final class LazyChannelPromise extends DefaultPromise<Channel> {
public SSLEngineInboundCloseFuture() {
super(null);
}
void setClosed() {
super.trySuccess();
}
@Override @Override
public Channel channel() { protected EventExecutor executor() {
if (ctx == null) { if (ctx == null) {
// Maybe we should better throw an IllegalStateException() ? throw new IllegalStateException();
return null;
} else {
return ctx.channel();
} }
} return ctx.executor();
@Override
public boolean trySuccess() {
return false;
}
@Override
public boolean tryFailure(Throwable cause) {
return false;
}
@Override
public ChannelPromise setSuccess() {
throw new IllegalStateException();
}
@Override
public ChannelPromise setFailure(Throwable cause) {
throw new IllegalStateException();
} }
} }
} }

View File

@ -348,17 +348,11 @@ public class ChunkedWriteHandler
} }
} }
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// try to flush again a last time.
//
// See #304
doFlush(ctx);
}
// This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events // This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events
@Override @Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
doFlush(ctx);
// Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the // Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the
// ChannelFuture and the registered FutureListener. See #304 // ChannelFuture and the registered FutureListener. See #304
discard(ctx, new ChannelException(ChunkedWriteHandler.class.getSimpleName() + " removed from pipeline.")); discard(ctx, new ChannelException(ChunkedWriteHandler.class.getSimpleName() + " removed from pipeline."));

View File

@ -210,7 +210,7 @@ public class IdleStateHandler extends ChannelStateHandlerAdapter implements Chan
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() & ctx.channel().isRegistered()) { if (ctx.channel().isActive() & ctx.channel().isRegistered()) {
// channelActvie() event has been fired already, which means this.channelActive() will // channelActvie() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead. // not be invoked. We have to initialize here instead.
@ -222,7 +222,7 @@ public class IdleStateHandler extends ChannelStateHandlerAdapter implements Chan
} }
@Override @Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
destroy(); destroy();
} }

View File

@ -103,7 +103,7 @@ public class ReadTimeoutHandler extends ChannelStateHandlerAdapter {
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() && ctx.channel().isRegistered()) { if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
// channelActvie() event has been fired already, which means this.channelActive() will // channelActvie() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead. // not be invoked. We have to initialize here instead.
@ -115,7 +115,7 @@ public class ReadTimeoutHandler extends ChannelStateHandlerAdapter {
} }
@Override @Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
destroy(); destroy();
} }

View File

@ -313,7 +313,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler
} }
@Override @Override
public void beforeRemove(ChannelHandlerContext ctx) { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (trafficCounter != null) { if (trafficCounter != null) {
trafficCounter.stop(); trafficCounter.stop();
} }

View File

@ -85,18 +85,10 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" + TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" +
ctx.channel().id(), checkInterval); ctx.channel().id(), checkInterval);
setTrafficCounter(trafficCounter); setTrafficCounter(trafficCounter);
trafficCounter.start(); trafficCounter.start();
} }
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
if (trafficCounter != null) {
trafficCounter.stop();
}
}
} }

View File

@ -17,23 +17,17 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.BufUtil;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter; import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOperationHandlerAdapter;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.ByteToByteEncoder;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.testsuite.util.BogusSslContextFactory; import io.netty.testsuite.util.BogusSslContextFactory;
import io.netty.util.concurrent.Future;
import org.junit.Test; import org.junit.Test;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
@ -105,7 +99,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
Channel sc = sb.bind().sync().channel(); Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel(); Channel cc = cb.connect().sync().channel();
ChannelFuture hf = cc.pipeline().get(SslHandler.class).handshake(); Future<Channel> hf = cc.pipeline().get(SslHandler.class).handshakeFuture();
cc.write(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE)); cc.write(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE));
final AtomicBoolean firstByteWriteFutureDone = new AtomicBoolean(); final AtomicBoolean firstByteWriteFutureDone = new AtomicBoolean();

View File

@ -19,7 +19,6 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.BufType; import io.netty.buffer.BufType;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
@ -34,6 +33,7 @@ import io.netty.handler.logging.ByteLoggingHandler;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.testsuite.util.BogusSslContextFactory; import io.netty.testsuite.util.BogusSslContextFactory;
import io.netty.util.concurrent.Future;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -145,7 +145,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
private class StartTlsClientHandler extends ChannelInboundMessageHandlerAdapter<String> { private class StartTlsClientHandler extends ChannelInboundMessageHandlerAdapter<String> {
private final SslHandler sslHandler; private final SslHandler sslHandler;
private ChannelFuture handshakeFuture; private Future<Channel> handshakeFuture;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
StartTlsClientHandler(SSLEngine engine) { StartTlsClientHandler(SSLEngine engine) {
@ -163,7 +163,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
public void messageReceived(final ChannelHandlerContext ctx, String msg) throws Exception { public void messageReceived(final ChannelHandlerContext ctx, String msg) throws Exception {
if ("StartTlsResponse".equals(msg)) { if ("StartTlsResponse".equals(msg)) {
ctx.pipeline().addAfter("logger", "ssl", sslHandler); ctx.pipeline().addAfter("logger", "ssl", sslHandler);
handshakeFuture = sslHandler.handshake(); handshakeFuture = sslHandler.handshakeFuture();
ctx.write("EncryptedRequest\n"); ctx.write("EncryptedRequest\n");
return; return;
} }

View File

@ -189,25 +189,15 @@ import java.lang.annotation.Target;
*/ */
public interface ChannelHandler { public interface ChannelHandler {
/**
* Gets called before the {@link ChannelHandler} is added to the actual context.
*/
void beforeAdd(ChannelHandlerContext ctx) throws Exception;
/** /**
* Gets called after the {@link ChannelHandler} was added to the actual context. * Gets called after the {@link ChannelHandler} was added to the actual context.
*/ */
void afterAdd(ChannelHandlerContext ctx) throws Exception; void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called before the {@link ChannelHandler} is removed from the actual context.
*/
void beforeRemove(ChannelHandlerContext ctx) throws Exception;
/** /**
* Gets called after the {@link ChannelHandler} was removed from the actual context. * Gets called after the {@link ChannelHandler} was removed from the actual context.
*/ */
void afterRemove(ChannelHandlerContext ctx) throws Exception; void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
/** /**
* Gets called if a {@link Throwable} was thrown. * Gets called if a {@link Throwable} was thrown.

View File

@ -36,7 +36,7 @@ public abstract class ChannelHandlerAdapter implements ChannelHandler {
* Do nothing by default, sub-classes may override this method. * Do nothing by default, sub-classes may override this method.
*/ */
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP // NOOP
} }
@ -44,23 +44,7 @@ public abstract class ChannelHandlerAdapter implements ChannelHandler {
* Do nothing by default, sub-classes may override this method. * Do nothing by default, sub-classes may override this method.
*/ */
@Override @Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
/**
* Do nothing by default, sub-classes may override this method.
*/
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP // NOOP
} }

View File

@ -117,45 +117,26 @@ public class CombinedChannelDuplexHandler extends ChannelDuplexHandler {
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (stateHandler == null) { if (stateHandler == null) {
throw new IllegalStateException( throw new IllegalStateException(
"init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() + "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
" if " + CombinedChannelDuplexHandler.class.getSimpleName() + " if " + CombinedChannelDuplexHandler.class.getSimpleName() +
" was constructed with the default constructor."); " was constructed with the default constructor.");
} }
try { try {
stateHandler.beforeAdd(ctx); stateHandler.handlerAdded(ctx);
} finally { } finally {
operationHandler.beforeAdd(ctx); operationHandler.handlerAdded(ctx);
} }
} }
@Override @Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
try { try {
stateHandler.afterAdd(ctx); stateHandler.handlerRemoved(ctx);
} finally { } finally {
operationHandler.afterAdd(ctx); operationHandler.handlerRemoved(ctx);
}
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
try {
stateHandler.beforeRemove(ctx);
} finally {
operationHandler.beforeRemove(ctx);
}
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
try {
stateHandler.afterRemove(ctx);
} finally {
operationHandler.afterRemove(ctx);
} }
} }

View File

@ -113,31 +113,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) { public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
final DefaultChannelHandlerContext newCtx;
synchronized (this) { synchronized (this) {
checkDuplicateName(name); checkDuplicateName(name);
newCtx = new DefaultChannelHandlerContext(this, group, name, handler); DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addFirst0(name, newCtx);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addFirst0(name, newCtx);
return this;
}
} }
// Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock
executeOnEventLoop(newCtx, new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addFirst0(name, newCtx);
}
}
});
return this; return this;
} }
@ -163,31 +144,13 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
final DefaultChannelHandlerContext newCtx;
synchronized (this) { synchronized (this) {
checkDuplicateName(name); checkDuplicateName(name);
newCtx = new DefaultChannelHandlerContext(this, group, name, handler); DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { addLast0(name, newCtx);
addLast0(name, newCtx);
return this;
}
} }
// Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock
executeOnEventLoop(newCtx, new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addLast0(name, newCtx);
}
}
});
return this; return this;
} }
@ -215,33 +178,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addBefore( public ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
final DefaultChannelHandlerContext ctx;
final DefaultChannelHandlerContext newCtx;
synchronized (this) { synchronized (this) {
ctx = getContextOrDie(baseName); DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name); checkDuplicateName(name);
newCtx = new DefaultChannelHandlerContext(this, group, name, handler); DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addBefore0(name, ctx, newCtx);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
addBefore0(name, ctx, newCtx);
return this;
}
} }
// Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock
executeOnEventLoop(newCtx, new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addBefore0(name, ctx, newCtx);
}
}
});
return this; return this;
} }
@ -267,33 +209,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline addAfter( public ChannelPipeline addAfter(
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
final DefaultChannelHandlerContext ctx;
final DefaultChannelHandlerContext newCtx;
synchronized (this) { synchronized (this) {
ctx = getContextOrDie(baseName); DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name); checkDuplicateName(name);
newCtx = new DefaultChannelHandlerContext(this, group, name, handler); DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { addAfter0(name, ctx, newCtx);
addAfter0(name, ctx, newCtx);
return this;
}
} }
// Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock
executeOnEventLoop(newCtx, new Runnable() {
@Override
public void run() {
synchronized (DefaultChannelPipeline.this) {
checkDuplicateName(name);
addAfter0(name, ctx, newCtx);
}
}
});
return this; return this;
} }
@ -448,8 +371,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
private void remove0(DefaultChannelHandlerContext ctx) { private void remove0(DefaultChannelHandlerContext ctx) {
callBeforeRemove(ctx);
DefaultChannelHandlerContext prev = ctx.prev; DefaultChannelHandlerContext prev = ctx.prev;
DefaultChannelHandlerContext next = ctx.next; DefaultChannelHandlerContext next = ctx.next;
prev.next = next; prev.next = next;
@ -542,7 +463,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
newCtx.prev = prev; newCtx.prev = prev;
newCtx.next = next; newCtx.next = next;
callBeforeRemove(ctx);
callBeforeAdd(newCtx); callBeforeAdd(newCtx);
prev.next = newCtx; prev.next = newCtx;
@ -553,36 +473,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
name2ctx.put(newName, newCtx); name2ctx.put(newName, newCtx);
ChannelPipelineException removeException = null; // remove old and add new
ChannelPipelineException addException = null; callAfterRemove(ctx, newCtx, newCtx);
boolean removed = false; callAfterAdd(newCtx);
try {
callAfterRemove(ctx, newCtx, newCtx);
removed = true;
} catch (ChannelPipelineException e) {
removeException = e;
}
boolean added = false;
try {
callAfterAdd(newCtx);
added = true;
} catch (ChannelPipelineException e) {
addException = e;
}
if (!removed && !added) {
logger.warn(removeException.getMessage(), removeException);
logger.warn(addException.getMessage(), addException);
throw new ChannelPipelineException(
"Both " + ctx.handler().getClass().getName() +
".afterRemove() and " + newCtx.handler().getClass().getName() +
".afterAdd() failed; see logs.");
} else if (!removed) {
throw removeException;
} else if (!added) {
throw addException;
}
} }
private static void callBeforeAdd(ChannelHandlerContext ctx) { private static void callBeforeAdd(ChannelHandlerContext ctx) {
@ -596,18 +489,24 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
h.added = true; h.added = true;
} }
try {
handler.beforeAdd(ctx);
} catch (Throwable t) {
throw new ChannelPipelineException(
handler.getClass().getName() +
".beforeAdd() has thrown an exception; not adding.", t);
}
} }
private void callAfterAdd(ChannelHandlerContext ctx) { private void callAfterAdd(final ChannelHandlerContext ctx) {
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
callAfterAdd0(ctx);
}
});
return;
}
callAfterAdd0(ctx);
}
private void callAfterAdd0(final ChannelHandlerContext ctx) {
try { try {
ctx.handler().afterAdd(ctx); ctx.handler().handlerAdded(ctx);
} catch (Throwable t) { } catch (Throwable t) {
boolean removed = false; boolean removed = false;
try { try {
@ -620,28 +519,33 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
if (removed) { if (removed) {
throw new ChannelPipelineException( fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ctx.handler().getClass().getName() +
".afterAdd() has thrown an exception; removed.", t); ".afterAdd() has thrown an exception; removed.", t));
} else { } else {
throw new ChannelPipelineException( fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ctx.handler().getClass().getName() +
".afterAdd() has thrown an exception; also failed to remove.", t); ".afterAdd() has thrown an exception; also failed to remove.", t));
} }
} }
} }
private static void callBeforeRemove(ChannelHandlerContext ctx) { private void callAfterRemove(
try { final DefaultChannelHandlerContext ctx, final DefaultChannelHandlerContext ctxPrev,
ctx.handler().beforeRemove(ctx); final DefaultChannelHandlerContext ctxNext) {
} catch (Throwable t) { if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
throw new ChannelPipelineException( ctx.executor().execute(new Runnable() {
ctx.handler().getClass().getName() + @Override
".beforeRemove() has thrown an exception; not removing.", t); public void run() {
callAfterRemove0(ctx, ctxPrev, ctxNext);
}
});
return;
} }
callAfterRemove0(ctx, ctxPrev, ctxNext);
} }
private static void callAfterRemove( private void callAfterRemove0(
final DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext ctxPrev, final DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext ctxPrev,
DefaultChannelHandlerContext ctxNext) { DefaultChannelHandlerContext ctxNext) {
@ -649,11 +553,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// Notify the complete removal. // Notify the complete removal.
try { try {
handler.afterRemove(ctx); handler.handlerRemoved(ctx);
} catch (Throwable t) { } catch (Throwable t) {
throw new ChannelPipelineException( fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ctx.handler().getClass().getName() +
".afterRemove() has thrown an exception.", t); ".afterRemove() has thrown an exception.", t));
} }
ctx.forwardBufferContent(ctxPrev, ctxNext); ctx.forwardBufferContent(ctxPrev, ctxNext);
@ -661,26 +565,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
ctx.setRemoved(); ctx.setRemoved();
} }
/**
* Executes a task on the event loop and waits for it to finish. If the task is interrupted, then the
* current thread will be interrupted. It is expected that the task performs any appropriate locking.
* <p>
* If the {@link Runnable#run()} call throws a {@link Throwable}, but it is not an instance of
* {@link Error} or {@link RuntimeException}, then it is wrapped inside a
* {@link ChannelPipelineException} and that is thrown instead.</p>
*
* @param r execute this runnable
* @see Runnable#run()
* @see Future#get()
* @throws Error if the task threw this.
* @throws RuntimeException if the task threw this.
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
* {@link Throwable}.
*/
private static void executeOnEventLoop(DefaultChannelHandlerContext ctx, Runnable r) {
waitForFuture(ctx.executor().submit(r));
}
/** /**
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted. * Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
* It is expected that the task performs any appropriate locking. * It is expected that the task performs any appropriate locking.
@ -1138,16 +1022,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception { } public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception { }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { } public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
@Override @Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception { } public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception { }
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception { }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { }
@ -1231,22 +1109,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
@Override @Override
public final void beforeAdd(ChannelHandlerContext ctx) throws Exception { public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP // NOOP
} }
@Override @Override
public final void afterAdd(ChannelHandlerContext ctx) throws Exception { public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public final void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public final void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP // NOOP
} }

View File

@ -632,37 +632,53 @@ public class DefaultChannelPipelineTest {
assertTrue(((ChannelOutboundMessageHandlerImpl) handler2.operationHandler()).flushed); assertTrue(((ChannelOutboundMessageHandlerImpl) handler2.operationHandler()).flushed);
} }
@Test @Test(timeout = 20000)
public void testLifeCycleAware() { public void testLifeCycleAware() throws Exception {
LocalChannel channel = new LocalChannel(); LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup(); LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly(); group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
List<LifeCycleAwareTestHandler> handlers = new ArrayList<LifeCycleAwareTestHandler>(); final List<LifeCycleAwareTestHandler> handlers = new ArrayList<LifeCycleAwareTestHandler>();
final CountDownLatch addLatch = new CountDownLatch(20);
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
LifeCycleAwareTestHandler handler = new LifeCycleAwareTestHandler("handler-" + i); final LifeCycleAwareTestHandler handler = new LifeCycleAwareTestHandler("handler-" + i);
// Add handler. // Add handler.
pipeline.addFirst(handler.name, handler); pipeline.addFirst(handler.name, handler);
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
// Validate handler life-cycle methods called.
handler.validate(true, false);
// Validate handler life-cycle methods called. // Store handler into the list.
handler.validate(true, true, false, false); handlers.add(handler);
// Store handler into the list. addLatch.countDown();
handlers.add(handler); }
});
} }
addLatch.await();
// Change the order of remove operations over all handlers in the pipeline. // Change the order of remove operations over all handlers in the pipeline.
Collections.shuffle(handlers); Collections.shuffle(handlers);
for (LifeCycleAwareTestHandler handler : handlers) { final CountDownLatch removeLatch = new CountDownLatch(20);
for (final LifeCycleAwareTestHandler handler : handlers) {
assertSame(handler, pipeline.remove(handler.name)); assertSame(handler, pipeline.remove(handler.name));
// Validate handler life-cycle methods called. channel.eventLoop().execute(new Runnable() {
handler.validate(true, true, true, true); @Override
public void run() {
// Validate handler life-cycle methods called.
handler.validate(true, true);
removeLatch.countDown();
}
});
} }
removeLatch.await();
} }
@Test @Test
@ -888,9 +904,7 @@ public class DefaultChannelPipelineTest {
private static final class LifeCycleAwareTestHandler extends ChannelHandlerAdapter { private static final class LifeCycleAwareTestHandler extends ChannelHandlerAdapter {
private final String name; private final String name;
private boolean beforeAdd;
private boolean afterAdd; private boolean afterAdd;
private boolean beforeRemove;
private boolean afterRemove; private boolean afterRemove;
/** /**
@ -902,37 +916,21 @@ public class DefaultChannelPipelineTest {
this.name = name; this.name = name;
} }
public void validate(boolean beforeAdd, boolean afterAdd, boolean beforeRemove, boolean afterRemove) { public void validate(boolean afterAdd, boolean afterRemove) {
assertEquals(name, beforeAdd, this.beforeAdd);
assertEquals(name, afterAdd, this.afterAdd); assertEquals(name, afterAdd, this.afterAdd);
assertEquals(name, beforeRemove, this.beforeRemove);
assertEquals(name, afterRemove, this.afterRemove); assertEquals(name, afterRemove, this.afterRemove);
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
validate(false, false, false, false); validate(false, false);
beforeAdd = true;
}
@Override
public void afterAdd(ChannelHandlerContext ctx) {
validate(true, false, false, false);
afterAdd = true; afterAdd = true;
} }
@Override @Override
public void beforeRemove(ChannelHandlerContext ctx) { public void handlerRemoved(ChannelHandlerContext ctx) {
validate(true, true, false, false); validate(true, false);
beforeRemove = true;
}
@Override
public void afterRemove(ChannelHandlerContext ctx) {
validate(true, true, true, false);
afterRemove = true; afterRemove = true;
} }

View File

@ -21,6 +21,7 @@ import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
@ -30,6 +31,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundByteHandler; import io.netty.channel.ChannelOutboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelStateHandlerAdapter;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
@ -39,9 +41,14 @@ import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.util.Deque;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -330,6 +337,127 @@ public class LocalTransportThreadModelTest {
} }
} }
@Test(timeout = 30000)
@Ignore("needs to get fixed")
public void testConcurrentAddRemove() throws Throwable {
EventLoopGroup l = new LocalEventLoopGroup(4, new PrefixThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e3"));
EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e4"));
EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e5"));
final EventExecutorGroup[] groups = { e1, e2, e3, e4, e5 };
try {
Deque<EventRecordHandler.Events> events = new ConcurrentLinkedDeque<EventRecordHandler.Events>();
final EventForwardHandler h1 = new EventForwardHandler();
final EventForwardHandler h2 = new EventForwardHandler();
final EventForwardHandler h3 = new EventForwardHandler();
final EventForwardHandler h4 = new EventForwardHandler();
final EventForwardHandler h5 = new EventForwardHandler();
final EventRecordHandler h6 = new EventRecordHandler(events);
final Channel ch = new LocalChannel();
// inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null
// outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null
ch.pipeline().addLast(h1)
.addLast(e1, h2)
.addLast(e2, h3)
.addLast(e3, h4)
.addLast(e4, h5)
.addLast(e5, "recorder", h6);
l.register(ch).sync().channel().connect(localAddr).sync();
final int TOTAL_CNT = 8192;
final LinkedList<EventRecordHandler.Events> expectedEvents = events(TOTAL_CNT);
Throwable cause = new Throwable();
Thread pipelineModifier = new Thread(new Runnable() {
@Override
public void run() {
Random random = new Random();
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
return;
}
if (!ch.isRegistered()) {
continue;
}
//EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)];
ChannelHandler handler = ch.pipeline().removeFirst();
ch.pipeline().addBefore(groups[random.nextInt(groups.length)], "recorder",
UUID.randomUUID().toString(), handler);
}
}
});
pipelineModifier.setDaemon(true);
pipelineModifier.start();
for (int i = 0; i < TOTAL_CNT; i++) {
EventRecordHandler.Events event = expectedEvents.get(i);
switch (event) {
case EXCEPTION_CAUGHT:
ch.pipeline().fireExceptionCaught(cause);
break;
case INBOUND_BufFER_UPDATED:
ch.pipeline().fireInboundBufferUpdated();
break;
case READ_SUSPEND:
ch.pipeline().fireChannelReadSuspended();
break;
case USER_EVENT:
ch.pipeline().fireUserEventTriggered("");
break;
}
}
while (events.size() < TOTAL_CNT + 2) {
System.out.println(events.size() + " < " + (TOTAL_CNT + 2));
Thread.sleep(10);
}
ch.close().sync();
expectedEvents.addFirst(EventRecordHandler.Events.ACTIVE);
expectedEvents.addFirst(EventRecordHandler.Events.REGISTERED);
expectedEvents.addLast(EventRecordHandler.Events.INACTIVE);
expectedEvents.addLast(EventRecordHandler.Events.UNREGISTERED);
for (;;) {
EventRecordHandler.Events event = events.poll();
if (event == null) {
Assert.assertTrue(expectedEvents.isEmpty());
break;
}
Assert.assertEquals(expectedEvents.poll(), event);
}
} finally {
l.shutdown();
e1.shutdown();
e2.shutdown();
e3.shutdown();
e4.shutdown();
e5.shutdown();
}
}
private static LinkedList<EventRecordHandler.Events> events(int size) {
EventRecordHandler.Events[] events = { EventRecordHandler.Events.EXCEPTION_CAUGHT,
EventRecordHandler.Events.USER_EVENT, EventRecordHandler.Events.INBOUND_BufFER_UPDATED,
EventRecordHandler.Events.READ_SUSPEND};
Random random = new Random();
LinkedList<EventRecordHandler.Events> expectedEvents = new LinkedList<EventRecordHandler.Events>();
for (int i = 0; i < size; i++) {
expectedEvents.add(events[random.nextInt(events.length)]);
}
return expectedEvents;
}
private static class ThreadNameAuditor private static class ThreadNameAuditor
extends ChannelDuplexHandler extends ChannelDuplexHandler
implements ChannelInboundMessageHandler<Object>, implements ChannelInboundMessageHandler<Object>,
@ -794,4 +922,76 @@ public class LocalTransportThreadModelTest {
return t; return t;
} }
} }
@ChannelHandler.Sharable
private static final class EventForwardHandler extends ChannelDuplexHandler {
@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.flush(promise);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
ctx.fireInboundBufferUpdated();
}
}
private static final class EventRecordHandler extends ChannelStateHandlerAdapter {
public enum Events {
EXCEPTION_CAUGHT,
USER_EVENT,
READ_SUSPEND,
INACTIVE,
ACTIVE,
UNREGISTERED,
REGISTERED,
INBOUND_BufFER_UPDATED
}
private final Queue<Events> events;
public EventRecordHandler(Queue<Events> events) {
this.events = events;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
events.add(Events.EXCEPTION_CAUGHT);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
events.add(Events.USER_EVENT);
}
@Override
public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception {
events.add(Events.READ_SUSPEND);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
events.add(Events.INACTIVE);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
events.add(Events.ACTIVE);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
events.add(Events.UNREGISTERED);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
events.add(Events.REGISTERED);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
events.add(Events.INBOUND_BufFER_UPDATED);
}
}
} }