From e80fb65c363369d97fa669908477e736ee91c492 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 22 Apr 2013 19:40:23 +0900 Subject: [PATCH] Clean up the pipeline implementation / Ensure Embedded*Channel does not run pending tasks immediately - Replace ugly 'prev != null' check with explicit event scheduling - Fix an incorrect flag operation in freeHandlerBuffersAfterRemoval() - Fix a bug in AbstractEmbeddedChannel.doRegister where it makes pending tasks immediately, where the pending tasks actually triggers inbound events - Remove unnecessary suppression of inboundBufferUpdated() event in DefaultChannelPipeline, which potentially hides an event ordering bug. Unfortunately, I don't remember why I added it in cca35454d214611792067c89b78f7477aae3a323. --- .../io/netty/channel/AbstractChannel.java | 50 ++++++++++++-- .../channel/DefaultChannelHandlerContext.java | 68 +++++++++++++------ .../netty/channel/DefaultChannelPipeline.java | 11 --- .../embedded/AbstractEmbeddedChannel.java | 7 +- 4 files changed, 90 insertions(+), 46 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 3ecc4dd290..3dc0f3cc5f 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -675,7 +675,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha doDisconnect(); promise.setSuccess(); if (wasActive && !isActive()) { - pipeline.fireChannelInactive(); + invokeLater(new Runnable() { + @Override + public void run() { + pipeline.fireChannelInactive(); + } + }); } } catch (Throwable t) { promise.setFailure(t); @@ -710,7 +715,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha flushFutureNotifier.notifyFlushFutures(closedChannelException); if (wasActive && !isActive()) { - pipeline.fireChannelInactive(); + invokeLater(new Runnable() { + @Override + public void run() { + pipeline.fireChannelInactive(); + } + }); } deregister(voidFuture()); @@ -754,7 +764,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (registered) { registered = false; promise.setSuccess(); - pipeline.fireChannelUnregistered(); + invokeLater(new Runnable() { + @Override + public void run() { + pipeline.fireChannelUnregistered(); + } + }); } else { // Some transports like local and AIO does not allow the deregistration of // an open channel. Their doDeregister() calls close(). Consequently, @@ -785,8 +800,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (eventLoop().inEventLoop()) { try { doBeginRead(); - } catch (Exception e) { - pipeline().fireExceptionCaught(e); + } catch (final Exception e) { + invokeLater(new Runnable() { + @Override + public void run() { + pipeline.fireExceptionCaught(e); + } + }); close(unsafe().voidFuture()); } } else { @@ -849,10 +869,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private void flush0() { if (!inFlushNow) { // Avoid re-entrance try { + // Flush immediately only when there's no pending flush. + // If there's a pending flush operation, event loop will call flushNow() later, + // and thus there's no need to call it now. if (!isFlushPending()) { flushNow(); - } else { - // Event loop will call flushNow() later by itself. } } catch (Throwable t) { flushFutureNotifier.notifyFlushFutures(t); @@ -930,6 +951,21 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } close(voidFuture()); } + + private void invokeLater(Runnable task) { + // This method is used by outbound operation implementations to trigger an inbound event later. + // They do not trigger an inbound event immediately because an outbound operation might have been + // triggered by another inbound event handler method. If fired immediately, the call stack + // will look like this for example: + // + // handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection. + // -> handlerA.ctx.close() + // -> channel.unsafe.close() + // -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet + // + // which means the execution of two inbound handler methods of the same handler overlap undesirably. + eventLoop().execute(task); + } } /** diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index c40c1d0b2d..5b2e3f2683 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -24,11 +24,14 @@ import io.netty.buffer.Unpooled; import io.netty.util.DefaultAttributeMap; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.internal.PlatformDependent; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static io.netty.channel.DefaultChannelPipeline.*; @@ -309,17 +312,42 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements void initHeadHandler() { // Must be called for the head handler. - HeadHandler h = (HeadHandler) handler; - if (h.initialized) { - return; + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + HeadHandler h = (HeadHandler) handler; + if (h.initialized) { + return; + } + + h.init(this); + h.initialized = true; + outByteBuf = h.byteSink; + outMsgBuf = h.msgSink; + } else { + Future f = executor.submit(new Runnable() { + @Override + public void run() { + initHeadHandler(); + } + }); + + boolean interrupted = false; + try { + while (!f.isDone()) { + try { + f.get(); + } catch (InterruptedException e) { + interrupted = true; + } catch (ExecutionException e) { + PlatformDependent.throwException(e); + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } } - - assert executor().inEventLoop(); - - h.init(this); - h.initialized = true; - outByteBuf = h.byteSink; - outMsgBuf = h.msgSink; } private boolean flushInboundBridge() { @@ -401,8 +429,9 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } private void freeHandlerBuffersAfterRemoval() { - if (flags == FLAG_REMOVED) { // Removed, but not freed yet - flags |= FLAG_FREED; + int flags = this.flags; + if ((flags & FLAG_REMOVED) != 0 && (flags & FLAG_FREED) == 0) { // Removed, but not freed yet + this.flags |= FLAG_FREED; final ChannelHandler handler = handler(); @@ -691,7 +720,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public ChannelHandlerContext fireChannelUnregistered() { final DefaultChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); - if (prev != null && executor.inEventLoop()) { + if (executor.inEventLoop()) { next.invokeChannelUnregistered(); } else { executor.execute(new Runnable() { @@ -743,7 +772,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public ChannelHandlerContext fireChannelInactive() { final DefaultChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); - if (prev != null && executor.inEventLoop()) { + if (executor.inEventLoop()) { next.invokeChannelInactive(); } else { executor.execute(new Runnable() { @@ -778,7 +807,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private void invokeExceptionCaught(final Throwable cause) { EventExecutor executor = executor(); - if (prev != null && executor.inEventLoop()) { + if (executor.inEventLoop()) { invokeExceptionCaught0(cause); } else { try { @@ -897,7 +926,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private void invokeInboundBufferUpdated() { ChannelStateHandler handler = (ChannelStateHandler) handler(); - if (handler instanceof ChannelInboundHandler) { for (;;) { try { @@ -1424,7 +1452,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements void invokeFreeInboundBuffer() { EventExecutor executor = executor(); - if (prev != null && executor.inEventLoop()) { + if (executor.inEventLoop()) { invokeFreeInboundBuffer0(); } else { executor.execute(new Runnable() { @@ -1517,11 +1545,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements return; } - if (handler() instanceof ChannelStateHandler) { - invokeExceptionCaught(cause); - } else { - findContextInbound().invokeExceptionCaught(cause); - } + invokeExceptionCaught(cause); } private static boolean inExceptionCaught(Throwable cause) { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 2e956f39b2..314dbff362 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -65,8 +65,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { private final Map name2ctx = new HashMap(4); - private boolean firedChannelActive; - private boolean fireInboundBufferUpdatedOnActivation; final Map childExecutors = new IdentityHashMap(); @@ -814,7 +812,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline fireChannelActive() { - firedChannelActive = true; head.initHeadHandler(); head.fireChannelActive(); @@ -822,10 +819,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { channel.read(); } - if (fireInboundBufferUpdatedOnActivation) { - fireInboundBufferUpdatedOnActivation = false; - head.fireInboundBufferUpdated(); - } return this; } @@ -853,10 +846,6 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline fireInboundBufferUpdated() { - if (!firedChannelActive) { - fireInboundBufferUpdatedOnActivation = true; - return this; - } head.fireInboundBufferUpdated(); return this; } diff --git a/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java index c9cc5cc45c..4bb690789d 100755 --- a/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java @@ -226,12 +226,7 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel { @Override protected Runnable doDeregister() throws Exception { - return new Runnable() { - @Override - public void run() { - runPendingTasks(); - } - }; + return null; } @Override