From ca5554dfe7707d2cce712d1165a648129e0a9c84 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 5 Apr 2013 15:46:18 +0200 Subject: [PATCH] [#1236] Fix problem where adding a new ChannelHandler could block the eventloop This change also introduce a few other changes which was needed: * ChannelHandler.beforeAdd(...) and ChannelHandler.beforeRemove(...) were removed * ChannelHandler.afterAdd(...) -> handlerAdded(...) * ChannelHandler.afterRemoved(...) -> handlerRemoved(...) * SslHandler.handshake() -> SslHandler.hanshakeFuture() as the handshake is triggered automatically after the Channel becomes active --- .../codec/http/HttpContentDecoder.java | 4 +- .../codec/http/HttpContentEncoder.java | 4 +- .../codec/http/HttpObjectAggregator.java | 2 +- .../WebSocketClientProtocolHandler.java | 2 +- .../WebSocketServerProtocolHandler.java | 2 +- .../handler/codec/spdy/SpdyFrameEncoder.java | 2 +- .../codec/http/HttpObjecctAggregatorTest.java | 2 +- .../handler/codec/ByteToMessageCodec.java | 5 - .../codec/compression/JZlibEncoder.java | 2 +- .../codec/compression/JdkZlibEncoder.java | 2 +- .../client/WebSocketClientHandler.java | 2 +- .../securechat/SecureChatServerHandler.java | 9 +- .../java/io/netty/handler/ssl/SslHandler.java | 236 ++++++------------ .../handler/stream/ChunkedWriteHandler.java | 12 +- .../handler/timeout/IdleStateHandler.java | 4 +- .../handler/timeout/ReadTimeoutHandler.java | 4 +- .../AbstractTrafficShapingHandler.java | 2 +- .../traffic/ChannelTrafficShapingHandler.java | 10 +- .../transport/socket/SocketSslEchoTest.java | 10 +- .../transport/socket/SocketStartTlsTest.java | 6 +- .../java/io/netty/channel/ChannelHandler.java | 14 +- .../netty/channel/ChannelHandlerAdapter.java | 20 +- .../channel/CombinedChannelDuplexHandler.java | 35 +-- .../netty/channel/DefaultChannelPipeline.java | 236 ++++-------------- .../channel/DefaultChannelPipelineTest.java | 68 +++-- .../local/LocalTransportThreadModelTest.java | 200 +++++++++++++++ 26 files changed, 410 insertions(+), 485 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java index df3ce33c54..5137c8f368 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecoder.java @@ -202,9 +202,9 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder { } @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java index de6abb2698..a3d72ab96d 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandler.java @@ -137,7 +137,7 @@ public class WebSocketClientProtocolHandler extends WebSocketProtocolHandler { } @Override - public void afterAdd(ChannelHandlerContext ctx) { + public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline cp = ctx.pipeline(); if (cp.get(WebSocketClientProtocolHandshakeHandler.class) == null) { // Add the WebSocketClientProtocolHandshakeHandler before this one. diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java index 2fed4a3490..aad5f9be07 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java @@ -80,7 +80,7 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler { } @Override - public void afterAdd(ChannelHandlerContext ctx) { + public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline cp = ctx.pipeline(); if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) { // Add the WebSocketHandshakeHandler before this one. diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java index dd872ddc02..23bcbaba46 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java @@ -60,7 +60,7 @@ public class SpdyFrameEncoder extends MessageToByteEncoder extends ChannelDuplexHandler outboundMsgMatcher = TypeParameterMatcher.get(outboundMessageType); } - @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - decoder.beforeAdd(ctx); - } - @Override public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { return decoder.newInboundBuffer(ctx); diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java index 2932f29173..584479ed73 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java @@ -404,7 +404,7 @@ public class JZlibEncoder extends ZlibEncoder { } @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; } } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index 83c999a2e4..638b9ff16f 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -262,7 +262,7 @@ public class JdkZlibEncoder extends ZlibEncoder { } @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; } } diff --git a/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClientHandler.java b/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClientHandler.java index f2bbd88b9e..ef70615268 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClientHandler.java +++ b/example/src/main/java/io/netty/example/http/websocketx/client/WebSocketClientHandler.java @@ -64,7 +64,7 @@ public class WebSocketClientHandler extends ChannelInboundMessageHandlerAdapter< } @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { handshakeFuture = ctx.newPromise(); } diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java b/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java index 029ab0055c..1b8b2334e2 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java @@ -16,13 +16,13 @@ package io.netty.example.securechat; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.net.InetAddress; import java.util.logging.Level; @@ -42,9 +42,10 @@ public class SecureChatServerHandler extends ChannelInboundMessageHandlerAdapter public void channelActive(final ChannelHandlerContext ctx) throws Exception { // Once session is secured, send a greeting and register the channel to the global channel // list so the channel received the messages from others. - ctx.pipeline().get(SslHandler.class).handshake().addListener(new ChannelFutureListener() { + ctx.pipeline().get(SslHandler.class).handshakeFuture().addListener( + new GenericFutureListener>() { @Override - public void operationComplete(ChannelFuture future) throws Exception { + public void operationComplete(Future future) throws Exception { ctx.write( "Welcome to " + InetAddress.getLocalHost().getHostName() + " secure chat service!\n"); diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index df3b0515e2..02285ae496 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -29,8 +29,11 @@ import io.netty.channel.ChannelOutboundByteHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelStateHandler; -import io.netty.channel.DefaultChannelPromise; import io.netty.channel.FileRegion; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.ImmediateExecutor; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; @@ -47,8 +50,6 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.DatagramChannel; import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; -import java.util.ArrayDeque; -import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -62,10 +63,9 @@ import java.util.regex.Pattern; * *

Beginning the handshake

*

- * You must make sure not to write a message while the - * {@linkplain #handshake() handshake} is in progress unless you are - * renegotiating. You will be notified by the {@link ChannelFuture} which is - * returned by the {@link #handshake()} method when the handshake + * You must make sure not to write a message while the handshake is in progress unless you are + * renegotiating. You will be notified by the {@link Future} which is + * returned by the {@link #handshakeFuture()} method when the handshake * process succeeds or fails. *

* Beside using the handshake {@link ChannelFuture} to get notified about the completation of the handshake it's @@ -128,7 +128,7 @@ import java.util.regex.Pattern; *

  • create a new {@link SslHandler} instance with {@code startTls} flag set * to {@code false},
  • *
  • insert the {@link SslHandler} to the {@link ChannelPipeline}, and
  • - *
  • Initiate SSL handshake by calling {@link SslHandler#handshake()}.
  • + *
  • Initiate SSL handshake.
  • * * *

    Known issues

    @@ -172,8 +172,8 @@ public class SslHandler private boolean sentFirstMessage; private WritableByteChannel bufferChannel; - private final Queue handshakePromises = new ArrayDeque(); - private final SSLEngineInboundCloseFuture sslCloseFuture = new SSLEngineInboundCloseFuture(); + private final LazyChannelPromise handshakePromise = new LazyChannelPromise(); + private final LazyChannelPromise sslCloseFuture = new LazyChannelPromise(); private final CloseNotifyListener closeNotifyWriteListener = new CloseNotifyListener(); private volatile long handshakeTimeoutMillis = 10000; @@ -286,68 +286,10 @@ public class SslHandler } /** - * Starts the SSL / TLS handshake and returns a {@link ChannelFuture} that will - * get notified once the handshake completes. + * Returns a {@link Future} that will get notified once the handshake completes. */ - public ChannelFuture handshake() { - return handshake(ctx.newPromise()); - } - - /** - * Starts an SSL / TLS handshake for the specified channel. - * - * @return a {@link ChannelPromise} which is notified when the handshake - * succeeds or fails. - */ - public ChannelFuture handshake(final ChannelPromise promise) { - final ChannelHandlerContext ctx = this.ctx; - - final ScheduledFuture timeoutFuture; - if (handshakeTimeoutMillis > 0) { - timeoutFuture = ctx.executor().schedule(new Runnable() { - @Override - public void run() { - if (promise.isDone()) { - return; - } - - SSLException e = new SSLException("handshake timed out"); - if (promise.tryFailure(e)) { - ctx.fireExceptionCaught(e); - ctx.close(); - } - } - }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); - } else { - timeoutFuture = null; - } - - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) throws Exception { - if (timeoutFuture != null) { - timeoutFuture.cancel(false); - } - } - }); - - ctx.executor().execute(new Runnable() { - @Override - public void run() { - try { - engine.beginHandshake(); - handshakePromises.add(promise); - flush0(ctx, ctx.newPromise(), true); - } catch (Exception e) { - if (promise.tryFailure(e)) { - ctx.fireExceptionCaught(e); - ctx.close(); - } - } - } - }); - - return promise; + public Future handshakeFuture() { + return handshakePromise; } /** @@ -390,7 +332,7 @@ public class SslHandler * For more informations see the apidocs of {@link SSLEngine} * */ - public ChannelFuture sslCloseFuture() { + public Future sslCloseFuture() { return sslCloseFuture; } @@ -882,7 +824,7 @@ public class SslHandler switch (result.getStatus()) { case CLOSED: // notify about the CLOSED state of the SSLEngine. See #137 - sslCloseFuture.setClosed(); + sslCloseFuture.trySuccess(ctx.channel()); break; case BUFFER_UNDERFLOW: break loop; @@ -966,15 +908,7 @@ public class SslHandler * Notify all the handshake futures about the successfully handshake */ private void setHandshakeSuccess() { - try { - for (;;) { - ChannelPromise p = handshakePromises.poll(); - if (p == null) { - break; - } - p.setSuccess(); - } - } finally { + if (handshakePromise.trySuccess(ctx.channel())) { ctx.fireUserEventTriggered(HANDSHAKE_SUCCESS_EVENT); } } @@ -983,39 +917,25 @@ public class SslHandler * Notify all the handshake futures about the failure during the handshake. */ private void setHandshakeFailure(Throwable cause) { + // Release all resources such as internal buffers that SSLEngine + // is managing. + engine.closeOutbound(); + + final boolean disconnected = cause == null || cause instanceof ClosedChannelException; try { - // Release all resources such as internal buffers that SSLEngine - // is managing. - engine.closeOutbound(); - - final boolean disconnected = cause == null || cause instanceof ClosedChannelException; - try { - engine.closeInbound(); - } catch (SSLException e) { - if (!disconnected) { - logger.warn("SSLEngine.closeInbound() raised an exception after a handshake failure.", e); - } else if (!closeNotifyWriteListener.done) { - logger.warn("SSLEngine.closeInbound() raised an exception due to closed connection.", e); - } else { - // cause == null && sentCloseNotify - // closeInbound() will raise an exception with bogus truncation attack warning. - } + engine.closeInbound(); + } catch (SSLException e) { + if (!disconnected) { + logger.warn("SSLEngine.closeInbound() raised an exception after a handshake failure.", e); + } else if (!closeNotifyWriteListener.done) { + logger.warn("SSLEngine.closeInbound() raised an exception due to closed connection.", e); + } else { + // cause == null && sentCloseNotify + // closeInbound() will raise an exception with bogus truncation attack warning. } + } - if (!handshakePromises.isEmpty()) { - if (cause == null) { - cause = new ClosedChannelException(); - } - - for (;;) { - ChannelPromise p = handshakePromises.poll(); - if (p == null) { - break; - } - p.setFailure(cause); - } - } - } finally { + if (handshakePromise.tryFailure(cause)) { ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause)); } flush0(ctx, 0, cause); @@ -1040,33 +960,71 @@ public class SslHandler } @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; - } - @Override - public void afterAdd(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive()) { // channelActvie() event has been fired already, which means this.channelActive() will // not be invoked. We have to initialize here instead. - handshake(); + handshake0(); } else { // channelActive() event has not been fired yet. this.channelOpen() will be invoked // and initialization will occur there. } } + private Future handshake0() { + final ScheduledFuture timeoutFuture; + if (handshakeTimeoutMillis > 0) { + timeoutFuture = ctx.executor().schedule(new Runnable() { + @Override + public void run() { + if (handshakePromise.isDone()) { + return; + } + + SSLException e = new SSLException("handshake timed out"); + if (handshakePromise.tryFailure(e)) { + ctx.fireExceptionCaught(e); + ctx.close(); + } + } + }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); + } else { + timeoutFuture = null; + } + + handshakePromise.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future f) throws Exception { + if (timeoutFuture != null) { + timeoutFuture.cancel(false); + } + } + }); + try { + engine.beginHandshake(); + flush0(ctx, ctx.newPromise(), true); + } catch (Exception e) { + if (handshakePromise.tryFailure(e)) { + ctx.fireExceptionCaught(e); + ctx.close(); + } + } + return handshakePromise; + } + /** - * Calls {@link #handshake()} once the {@link Channel} is connected + * Issues a SSL handshake once connected when used in client-mode */ @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { if (!startTls && engine.getUseClientMode()) { // issue and handshake and add a listener to it which will fire an exception event if // an exception was thrown while doing the handshake - handshake().addListener(new ChannelFutureListener() { + handshake0().addListener(new GenericFutureListener>() { @Override - public void operationComplete(ChannelFuture future) throws Exception { + public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { ctx.pipeline().fireExceptionCaught(future.cause()); ctx.close(); @@ -1074,7 +1032,6 @@ public class SslHandler } }); } - ctx.fireChannelActive(); } @@ -1131,43 +1088,14 @@ public class SslHandler } } - private final class SSLEngineInboundCloseFuture extends DefaultChannelPromise { - public SSLEngineInboundCloseFuture() { - super(null); - } - - void setClosed() { - super.trySuccess(); - } + private final class LazyChannelPromise extends DefaultPromise { @Override - public Channel channel() { + protected EventExecutor executor() { if (ctx == null) { - // Maybe we should better throw an IllegalStateException() ? - return null; - } else { - return ctx.channel(); + throw new IllegalStateException(); } - } - - @Override - public boolean trySuccess() { - return false; - } - - @Override - public boolean tryFailure(Throwable cause) { - return false; - } - - @Override - public ChannelPromise setSuccess() { - throw new IllegalStateException(); - } - - @Override - public ChannelPromise setFailure(Throwable cause) { - throw new IllegalStateException(); + return ctx.executor(); } } } diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 2d2eb478fa..a5fddb3de0 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -348,17 +348,11 @@ public class ChunkedWriteHandler } } - @Override - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - // try to flush again a last time. - // - // See #304 - doFlush(ctx); - } - // This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + doFlush(ctx); + // Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the // ChannelFuture and the registered FutureListener. See #304 discard(ctx, new ChannelException(ChunkedWriteHandler.class.getSimpleName() + " removed from pipeline.")); diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 90576be75b..5ccf794809 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -210,7 +210,7 @@ public class IdleStateHandler extends ChannelStateHandlerAdapter implements Chan } @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive() & ctx.channel().isRegistered()) { // channelActvie() event has been fired already, which means this.channelActive() will // not be invoked. We have to initialize here instead. @@ -222,7 +222,7 @@ public class IdleStateHandler extends ChannelStateHandlerAdapter implements Chan } @Override - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { destroy(); } diff --git a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java index 1ae7f12e17..687445163e 100644 --- a/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/ReadTimeoutHandler.java @@ -103,7 +103,7 @@ public class ReadTimeoutHandler extends ChannelStateHandlerAdapter { } @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive() && ctx.channel().isRegistered()) { // channelActvie() event has been fired already, which means this.channelActive() will // not be invoked. We have to initialize here instead. @@ -115,7 +115,7 @@ public class ReadTimeoutHandler extends ChannelStateHandlerAdapter { } @Override - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { destroy(); } diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java index 039c06c5ec..1166c407a5 100644 --- a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java @@ -313,7 +313,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler } @Override - public void beforeRemove(ChannelHandlerContext ctx) { + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { if (trafficCounter != null) { trafficCounter.stop(); } diff --git a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java index 0c26b6ba95..4890a6229b 100644 --- a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java @@ -85,18 +85,10 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler } @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" + ctx.channel().id(), checkInterval); setTrafficCounter(trafficCounter); trafficCounter.start(); } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - super.afterRemove(ctx); - if (trafficCounter != null) { - trafficCounter.stop(); - } - } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java index 6376196902..c5685a2211 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java @@ -17,23 +17,17 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.BufUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandlerAdapter; import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOperationHandlerAdapter; -import io.netty.channel.ChannelOutboundMessageHandlerAdapter; -import io.netty.channel.ChannelPromise; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.ByteToByteEncoder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.testsuite.util.BogusSslContextFactory; +import io.netty.util.concurrent.Future; import org.junit.Test; import javax.net.ssl.SSLEngine; @@ -105,7 +99,7 @@ public class SocketSslEchoTest extends AbstractSocketTest { Channel sc = sb.bind().sync().channel(); Channel cc = cb.connect().sync().channel(); - ChannelFuture hf = cc.pipeline().get(SslHandler.class).handshake(); + Future hf = cc.pipeline().get(SslHandler.class).handshakeFuture(); cc.write(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE)); final AtomicBoolean firstByteWriteFutureDone = new AtomicBoolean(); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java index 46d4cce25d..1d06f2ff93 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java @@ -19,7 +19,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.BufType; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInitializer; @@ -34,6 +33,7 @@ import io.netty.handler.logging.ByteLoggingHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.ssl.SslHandler; import io.netty.testsuite.util.BogusSslContextFactory; +import io.netty.util.concurrent.Future; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -145,7 +145,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { private class StartTlsClientHandler extends ChannelInboundMessageHandlerAdapter { private final SslHandler sslHandler; - private ChannelFuture handshakeFuture; + private Future handshakeFuture; final AtomicReference exception = new AtomicReference(); StartTlsClientHandler(SSLEngine engine) { @@ -163,7 +163,7 @@ public class SocketStartTlsTest extends AbstractSocketTest { public void messageReceived(final ChannelHandlerContext ctx, String msg) throws Exception { if ("StartTlsResponse".equals(msg)) { ctx.pipeline().addAfter("logger", "ssl", sslHandler); - handshakeFuture = sslHandler.handshake(); + handshakeFuture = sslHandler.handshakeFuture(); ctx.write("EncryptedRequest\n"); return; } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandler.java b/transport/src/main/java/io/netty/channel/ChannelHandler.java index 77fd56f326..11458bfbf1 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandler.java @@ -189,25 +189,15 @@ import java.lang.annotation.Target; */ public interface ChannelHandler { - /** - * Gets called before the {@link ChannelHandler} is added to the actual context. - */ - void beforeAdd(ChannelHandlerContext ctx) throws Exception; - /** * Gets called after the {@link ChannelHandler} was added to the actual context. */ - void afterAdd(ChannelHandlerContext ctx) throws Exception; - - /** - * Gets called before the {@link ChannelHandler} is removed from the actual context. - */ - void beforeRemove(ChannelHandlerContext ctx) throws Exception; + void handlerAdded(ChannelHandlerContext ctx) throws Exception; /** * Gets called after the {@link ChannelHandler} was removed from the actual context. */ - void afterRemove(ChannelHandlerContext ctx) throws Exception; + void handlerRemoved(ChannelHandlerContext ctx) throws Exception; /** * Gets called if a {@link Throwable} was thrown. diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index 24735a8b8e..1520f50b32 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -36,7 +36,7 @@ public abstract class ChannelHandlerAdapter implements ChannelHandler { * Do nothing by default, sub-classes may override this method. */ @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP } @@ -44,23 +44,7 @@ public abstract class ChannelHandlerAdapter implements ChannelHandler { * Do nothing by default, sub-classes may override this method. */ @Override - public void afterAdd(ChannelHandlerContext ctx) throws Exception { - // NOOP - } - - /** - * Do nothing by default, sub-classes may override this method. - */ - @Override - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - // NOOP - } - - /** - * Do nothing by default, sub-classes may override this method. - */ - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP } diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java index 3329197937..ab7faf2e6c 100644 --- a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java @@ -117,45 +117,26 @@ public class CombinedChannelDuplexHandler extends ChannelDuplexHandler { } @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (stateHandler == null) { throw new IllegalStateException( "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() + - " if " + CombinedChannelDuplexHandler.class.getSimpleName() + - " was constructed with the default constructor."); + " if " + CombinedChannelDuplexHandler.class.getSimpleName() + + " was constructed with the default constructor."); } - try { - stateHandler.beforeAdd(ctx); + stateHandler.handlerAdded(ctx); } finally { - operationHandler.beforeAdd(ctx); + operationHandler.handlerAdded(ctx); } } @Override - public void afterAdd(ChannelHandlerContext ctx) throws Exception { + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { try { - stateHandler.afterAdd(ctx); + stateHandler.handlerRemoved(ctx); } finally { - operationHandler.afterAdd(ctx); - } - } - - @Override - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - try { - stateHandler.beforeRemove(ctx); - } finally { - operationHandler.beforeRemove(ctx); - } - } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - try { - stateHandler.afterRemove(ctx); - } finally { - operationHandler.afterRemove(ctx); + operationHandler.handlerRemoved(ctx); } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index b71d43df6e..b5229aa56a 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -113,31 +113,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) { - final DefaultChannelHandlerContext newCtx; - synchronized (this) { checkDuplicateName(name); - newCtx = new DefaultChannelHandlerContext(this, group, name, handler); - - if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - addFirst0(name, newCtx); - return this; - } + DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); + addFirst0(name, newCtx); } - // Run the following 'waiting' code outside of the above synchronized block - // in order to avoid deadlock - - executeOnEventLoop(newCtx, new Runnable() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - checkDuplicateName(name); - addFirst0(name, newCtx); - } - } - }); - return this; } @@ -163,31 +144,13 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { - final DefaultChannelHandlerContext newCtx; - synchronized (this) { checkDuplicateName(name); - newCtx = new DefaultChannelHandlerContext(this, group, name, handler); - if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - addLast0(name, newCtx); - return this; - } + DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); + addLast0(name, newCtx); } - // Run the following 'waiting' code outside of the above synchronized block - // in order to avoid deadlock - - executeOnEventLoop(newCtx, new Runnable() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - checkDuplicateName(name); - addLast0(name, newCtx); - } - } - }); - return this; } @@ -215,33 +178,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addBefore( EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { - final DefaultChannelHandlerContext ctx; - final DefaultChannelHandlerContext newCtx; - synchronized (this) { - ctx = getContextOrDie(baseName); + DefaultChannelHandlerContext ctx = getContextOrDie(baseName); checkDuplicateName(name); - newCtx = new DefaultChannelHandlerContext(this, group, name, handler); - - if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - addBefore0(name, ctx, newCtx); - return this; - } + DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); + addBefore0(name, ctx, newCtx); } - - // Run the following 'waiting' code outside of the above synchronized block - // in order to avoid deadlock - - executeOnEventLoop(newCtx, new Runnable() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - checkDuplicateName(name); - addBefore0(name, ctx, newCtx); - } - } - }); - return this; } @@ -267,33 +209,14 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline addAfter( EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) { - final DefaultChannelHandlerContext ctx; - final DefaultChannelHandlerContext newCtx; - synchronized (this) { - ctx = getContextOrDie(baseName); + DefaultChannelHandlerContext ctx = getContextOrDie(baseName); checkDuplicateName(name); - newCtx = new DefaultChannelHandlerContext(this, group, name, handler); + DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); - if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - addAfter0(name, ctx, newCtx); - return this; - } + addAfter0(name, ctx, newCtx); } - // Run the following 'waiting' code outside of the above synchronized block - // in order to avoid deadlock - - executeOnEventLoop(newCtx, new Runnable() { - @Override - public void run() { - synchronized (DefaultChannelPipeline.this) { - checkDuplicateName(name); - addAfter0(name, ctx, newCtx); - } - } - }); - return this; } @@ -448,8 +371,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { } private void remove0(DefaultChannelHandlerContext ctx) { - callBeforeRemove(ctx); - DefaultChannelHandlerContext prev = ctx.prev; DefaultChannelHandlerContext next = ctx.next; prev.next = next; @@ -542,7 +463,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { newCtx.prev = prev; newCtx.next = next; - callBeforeRemove(ctx); callBeforeAdd(newCtx); prev.next = newCtx; @@ -553,36 +473,9 @@ final class DefaultChannelPipeline implements ChannelPipeline { } name2ctx.put(newName, newCtx); - ChannelPipelineException removeException = null; - ChannelPipelineException addException = null; - boolean removed = false; - try { - callAfterRemove(ctx, newCtx, newCtx); - removed = true; - } catch (ChannelPipelineException e) { - removeException = e; - } - - boolean added = false; - try { - callAfterAdd(newCtx); - added = true; - } catch (ChannelPipelineException e) { - addException = e; - } - - if (!removed && !added) { - logger.warn(removeException.getMessage(), removeException); - logger.warn(addException.getMessage(), addException); - throw new ChannelPipelineException( - "Both " + ctx.handler().getClass().getName() + - ".afterRemove() and " + newCtx.handler().getClass().getName() + - ".afterAdd() failed; see logs."); - } else if (!removed) { - throw removeException; - } else if (!added) { - throw addException; - } + // remove old and add new + callAfterRemove(ctx, newCtx, newCtx); + callAfterAdd(newCtx); } private static void callBeforeAdd(ChannelHandlerContext ctx) { @@ -596,18 +489,24 @@ final class DefaultChannelPipeline implements ChannelPipeline { } h.added = true; } - try { - handler.beforeAdd(ctx); - } catch (Throwable t) { - throw new ChannelPipelineException( - handler.getClass().getName() + - ".beforeAdd() has thrown an exception; not adding.", t); - } } - private void callAfterAdd(ChannelHandlerContext ctx) { + private void callAfterAdd(final ChannelHandlerContext ctx) { + if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) { + ctx.executor().execute(new Runnable() { + @Override + public void run() { + callAfterAdd0(ctx); + } + }); + return; + } + callAfterAdd0(ctx); + } + + private void callAfterAdd0(final ChannelHandlerContext ctx) { try { - ctx.handler().afterAdd(ctx); + ctx.handler().handlerAdded(ctx); } catch (Throwable t) { boolean removed = false; try { @@ -620,28 +519,33 @@ final class DefaultChannelPipeline implements ChannelPipeline { } if (removed) { - throw new ChannelPipelineException( + fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + - ".afterAdd() has thrown an exception; removed.", t); + ".afterAdd() has thrown an exception; removed.", t)); } else { - throw new ChannelPipelineException( + fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + - ".afterAdd() has thrown an exception; also failed to remove.", t); + ".afterAdd() has thrown an exception; also failed to remove.", t)); } } } - private static void callBeforeRemove(ChannelHandlerContext ctx) { - try { - ctx.handler().beforeRemove(ctx); - } catch (Throwable t) { - throw new ChannelPipelineException( - ctx.handler().getClass().getName() + - ".beforeRemove() has thrown an exception; not removing.", t); + private void callAfterRemove( + final DefaultChannelHandlerContext ctx, final DefaultChannelHandlerContext ctxPrev, + final DefaultChannelHandlerContext ctxNext) { + if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) { + ctx.executor().execute(new Runnable() { + @Override + public void run() { + callAfterRemove0(ctx, ctxPrev, ctxNext); + } + }); + return; } + callAfterRemove0(ctx, ctxPrev, ctxNext); } - private static void callAfterRemove( + private void callAfterRemove0( final DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext ctxPrev, DefaultChannelHandlerContext ctxNext) { @@ -649,11 +553,11 @@ final class DefaultChannelPipeline implements ChannelPipeline { // Notify the complete removal. try { - handler.afterRemove(ctx); + handler.handlerRemoved(ctx); } catch (Throwable t) { - throw new ChannelPipelineException( + fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + - ".afterRemove() has thrown an exception.", t); + ".afterRemove() has thrown an exception.", t)); } ctx.forwardBufferContent(ctxPrev, ctxNext); @@ -661,26 +565,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { ctx.setRemoved(); } - /** - * Executes a task on the event loop and waits for it to finish. If the task is interrupted, then the - * current thread will be interrupted. It is expected that the task performs any appropriate locking. - *

    - * If the {@link Runnable#run()} call throws a {@link Throwable}, but it is not an instance of - * {@link Error} or {@link RuntimeException}, then it is wrapped inside a - * {@link ChannelPipelineException} and that is thrown instead.

    - * - * @param r execute this runnable - * @see Runnable#run() - * @see Future#get() - * @throws Error if the task threw this. - * @throws RuntimeException if the task threw this. - * @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of - * {@link Throwable}. - */ - private static void executeOnEventLoop(DefaultChannelHandlerContext ctx, Runnable r) { - waitForFuture(ctx.executor().submit(r)); - } - /** * Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted. * It is expected that the task performs any appropriate locking. @@ -1138,16 +1022,10 @@ final class DefaultChannelPipeline implements ChannelPipeline { public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception { } @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { } + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override - public void afterAdd(ChannelHandlerContext ctx) throws Exception { } - - @Override - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { } + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { } @@ -1231,22 +1109,12 @@ final class DefaultChannelPipeline implements ChannelPipeline { } @Override - public final void beforeAdd(ChannelHandlerContext ctx) throws Exception { + public final void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override - public final void afterAdd(ChannelHandlerContext ctx) throws Exception { - // NOOP - } - - @Override - public final void beforeRemove(ChannelHandlerContext ctx) throws Exception { - // NOOP - } - - @Override - public final void afterRemove(ChannelHandlerContext ctx) throws Exception { + public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP } diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index b0093c516f..2610602b74 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -632,37 +632,53 @@ public class DefaultChannelPipelineTest { assertTrue(((ChannelOutboundMessageHandlerImpl) handler2.operationHandler()).flushed); } - @Test - public void testLifeCycleAware() { + @Test(timeout = 20000) + public void testLifeCycleAware() throws Exception { LocalChannel channel = new LocalChannel(); LocalEventLoopGroup group = new LocalEventLoopGroup(); group.register(channel).awaitUninterruptibly(); final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - List handlers = new ArrayList(); - + final List handlers = new ArrayList(); + final CountDownLatch addLatch = new CountDownLatch(20); for (int i = 0; i < 20; i++) { - LifeCycleAwareTestHandler handler = new LifeCycleAwareTestHandler("handler-" + i); + final LifeCycleAwareTestHandler handler = new LifeCycleAwareTestHandler("handler-" + i); // Add handler. pipeline.addFirst(handler.name, handler); + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + // Validate handler life-cycle methods called. + handler.validate(true, false); - // Validate handler life-cycle methods called. - handler.validate(true, true, false, false); + // Store handler into the list. + handlers.add(handler); - // Store handler into the list. - handlers.add(handler); + addLatch.countDown(); + } + }); } + addLatch.await(); // Change the order of remove operations over all handlers in the pipeline. Collections.shuffle(handlers); - for (LifeCycleAwareTestHandler handler : handlers) { + final CountDownLatch removeLatch = new CountDownLatch(20); + + for (final LifeCycleAwareTestHandler handler : handlers) { assertSame(handler, pipeline.remove(handler.name)); - // Validate handler life-cycle methods called. - handler.validate(true, true, true, true); + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + // Validate handler life-cycle methods called. + handler.validate(true, true); + removeLatch.countDown(); + } + }); } + removeLatch.await(); } @Test @@ -888,9 +904,7 @@ public class DefaultChannelPipelineTest { private static final class LifeCycleAwareTestHandler extends ChannelHandlerAdapter { private final String name; - private boolean beforeAdd; private boolean afterAdd; - private boolean beforeRemove; private boolean afterRemove; /** @@ -902,37 +916,21 @@ public class DefaultChannelPipelineTest { this.name = name; } - public void validate(boolean beforeAdd, boolean afterAdd, boolean beforeRemove, boolean afterRemove) { - assertEquals(name, beforeAdd, this.beforeAdd); + public void validate(boolean afterAdd, boolean afterRemove) { assertEquals(name, afterAdd, this.afterAdd); - assertEquals(name, beforeRemove, this.beforeRemove); assertEquals(name, afterRemove, this.afterRemove); } @Override - public void beforeAdd(ChannelHandlerContext ctx) { - validate(false, false, false, false); - - beforeAdd = true; - } - - @Override - public void afterAdd(ChannelHandlerContext ctx) { - validate(true, false, false, false); + public void handlerAdded(ChannelHandlerContext ctx) { + validate(false, false); afterAdd = true; } @Override - public void beforeRemove(ChannelHandlerContext ctx) { - validate(true, true, false, false); - - beforeRemove = true; - } - - @Override - public void afterRemove(ChannelHandlerContext ctx) { - validate(true, true, true, false); + public void handlerRemoved(ChannelHandlerContext ctx) { + validate(true, false); afterRemove = true; } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 27e0004d1b..d6c2bb70b8 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -21,6 +21,7 @@ import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundByteHandler; @@ -30,6 +31,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundByteHandler; import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelStateHandlerAdapter; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; @@ -39,9 +41,14 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import java.util.Deque; import java.util.HashSet; +import java.util.LinkedList; import java.util.Queue; +import java.util.Random; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -330,6 +337,127 @@ public class LocalTransportThreadModelTest { } } + @Test(timeout = 30000) + @Ignore("needs to get fixed") + public void testConcurrentAddRemove() throws Throwable { + EventLoopGroup l = new LocalEventLoopGroup(4, new PrefixThreadFactory("l")); + EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e1")); + EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e2")); + EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e3")); + EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e4")); + EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new PrefixThreadFactory("e5")); + + final EventExecutorGroup[] groups = { e1, e2, e3, e4, e5 }; + try { + Deque events = new ConcurrentLinkedDeque(); + final EventForwardHandler h1 = new EventForwardHandler(); + final EventForwardHandler h2 = new EventForwardHandler(); + final EventForwardHandler h3 = new EventForwardHandler(); + final EventForwardHandler h4 = new EventForwardHandler(); + final EventForwardHandler h5 = new EventForwardHandler(); + final EventRecordHandler h6 = new EventRecordHandler(events); + + final Channel ch = new LocalChannel(); + + // inbound: int -> byte[4] -> int -> int -> byte[4] -> int -> /dev/null + // outbound: int -> int -> byte[4] -> int -> int -> byte[4] -> /dev/null + ch.pipeline().addLast(h1) + .addLast(e1, h2) + .addLast(e2, h3) + .addLast(e3, h4) + .addLast(e4, h5) + .addLast(e5, "recorder", h6); + + l.register(ch).sync().channel().connect(localAddr).sync(); + + final int TOTAL_CNT = 8192; + final LinkedList expectedEvents = events(TOTAL_CNT); + + Throwable cause = new Throwable(); + + Thread pipelineModifier = new Thread(new Runnable() { + @Override + public void run() { + Random random = new Random(); + + while (true) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + return; + } + if (!ch.isRegistered()) { + continue; + } + //EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)]; + ChannelHandler handler = ch.pipeline().removeFirst(); + ch.pipeline().addBefore(groups[random.nextInt(groups.length)], "recorder", + UUID.randomUUID().toString(), handler); + } + } + }); + pipelineModifier.setDaemon(true); + pipelineModifier.start(); + for (int i = 0; i < TOTAL_CNT; i++) { + EventRecordHandler.Events event = expectedEvents.get(i); + switch (event) { + case EXCEPTION_CAUGHT: + ch.pipeline().fireExceptionCaught(cause); + break; + case INBOUND_BufFER_UPDATED: + ch.pipeline().fireInboundBufferUpdated(); + break; + case READ_SUSPEND: + ch.pipeline().fireChannelReadSuspended(); + break; + case USER_EVENT: + ch.pipeline().fireUserEventTriggered(""); + break; + } + } + + while (events.size() < TOTAL_CNT + 2) { + System.out.println(events.size() + " < " + (TOTAL_CNT + 2)); + Thread.sleep(10); + } + + ch.close().sync(); + + expectedEvents.addFirst(EventRecordHandler.Events.ACTIVE); + expectedEvents.addFirst(EventRecordHandler.Events.REGISTERED); + expectedEvents.addLast(EventRecordHandler.Events.INACTIVE); + expectedEvents.addLast(EventRecordHandler.Events.UNREGISTERED); + + for (;;) { + EventRecordHandler.Events event = events.poll(); + if (event == null) { + Assert.assertTrue(expectedEvents.isEmpty()); + break; + } + Assert.assertEquals(expectedEvents.poll(), event); + } + } finally { + l.shutdown(); + e1.shutdown(); + e2.shutdown(); + e3.shutdown(); + e4.shutdown(); + e5.shutdown(); + } + } + + private static LinkedList events(int size) { + EventRecordHandler.Events[] events = { EventRecordHandler.Events.EXCEPTION_CAUGHT, + EventRecordHandler.Events.USER_EVENT, EventRecordHandler.Events.INBOUND_BufFER_UPDATED, + EventRecordHandler.Events.READ_SUSPEND}; + Random random = new Random(); + LinkedList expectedEvents = new LinkedList(); + for (int i = 0; i < size; i++) { + expectedEvents.add(events[random.nextInt(events.length)]); + } + return expectedEvents; + } + private static class ThreadNameAuditor extends ChannelDuplexHandler implements ChannelInboundMessageHandler, @@ -794,4 +922,76 @@ public class LocalTransportThreadModelTest { return t; } } + + @ChannelHandler.Sharable + private static final class EventForwardHandler extends ChannelDuplexHandler { + @Override + public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + ctx.flush(promise); + } + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { + ctx.fireInboundBufferUpdated(); + } + } + + private static final class EventRecordHandler extends ChannelStateHandlerAdapter { + public enum Events { + EXCEPTION_CAUGHT, + USER_EVENT, + READ_SUSPEND, + INACTIVE, + ACTIVE, + UNREGISTERED, + REGISTERED, + INBOUND_BufFER_UPDATED + } + + private final Queue events; + + public EventRecordHandler(Queue events) { + this.events = events; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + events.add(Events.EXCEPTION_CAUGHT); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + events.add(Events.USER_EVENT); + } + + @Override + public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception { + events.add(Events.READ_SUSPEND); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + events.add(Events.INACTIVE); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + events.add(Events.ACTIVE); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + events.add(Events.UNREGISTERED); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + events.add(Events.REGISTERED); + } + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { + events.add(Events.INBOUND_BufFER_UPDATED); + } + } }