From a507ea97ef42938f62083996abc8f5477a4cac56 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 8 Jun 2012 23:11:15 +0900 Subject: [PATCH] Move some logic from DefaultChannelPipeline to DefaultChannelHandlerContext - Using the fact that head is always non-null, we can remove some code in DefaultChannelPipeline and move some to DefaultChannelHandlerContext --- .../channel/DefaultChannelHandlerContext.java | 157 ++++++++++-- .../netty/channel/DefaultChannelPipeline.java | 230 ++---------------- 2 files changed, 160 insertions(+), 227 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 06874290a8..7710848cdd 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -129,7 +129,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements DefaultChannelHandlerContext.this.next, ChannelHandlerType.STATE); if (next != null) { next.fillBridge(); - DefaultChannelPipeline.fireInboundBufferUpdated(next); + EventExecutor executor = next.executor(); + if (executor.inEventLoop()) { + next.curCtxFireInboundBufferUpdatedTask.run(); + } else { + executor.execute(next.curCtxFireInboundBufferUpdatedTask); + } } } }; @@ -381,12 +386,30 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public boolean hasNextInboundByteBuffer() { - return DefaultChannelPipeline.hasNextInboundByteBuffer(next); + DefaultChannelHandlerContext ctx = next; + for (;;) { + if (ctx == null) { + return false; + } + if (ctx.inByteBridge != null) { + return true; + } + ctx = ctx.next; + } } @Override public boolean hasNextInboundMessageBuffer() { - return DefaultChannelPipeline.hasNextInboundMessageBuffer(next); + DefaultChannelHandlerContext ctx = next; + for (;;) { + if (ctx == null) { + return false; + } + if (ctx.inMsgBridge != null) { + return true; + } + ctx = ctx.next; + } } @Override @@ -401,12 +424,55 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public ChannelBuffer nextInboundByteBuffer() { - return DefaultChannelPipeline.nextInboundByteBuffer(next); + DefaultChannelHandlerContext ctx = next; + final Thread currentThread = Thread.currentThread(); + for (;;) { + if (ctx == null) { + throw new NoSuchBufferException(); + } + if (ctx.inByteBuf != null) { + if (ctx.executor().inEventLoop(currentThread)) { + return ctx.inByteBuf; + } else { + StreamBridge bridge = ctx.inByteBridge.get(); + if (bridge == null) { + bridge = new StreamBridge(); + if (!ctx.inByteBridge.compareAndSet(null, bridge)) { + bridge = ctx.inByteBridge.get(); + } + } + return bridge.byteBuf; + } + } + ctx = ctx.next; + } } @Override public Queue nextInboundMessageBuffer() { - return DefaultChannelPipeline.nextInboundMessageBuffer(next); + DefaultChannelHandlerContext ctx = next; + final Thread currentThread = Thread.currentThread(); + for (;;) { + if (ctx == null) { + throw new NoSuchBufferException(); + } + + if (ctx.inMsgBuf != null) { + if (ctx.executor().inEventLoop(currentThread)) { + return ctx.inMsgBuf; + } else { + MessageBridge bridge = ctx.inMsgBridge.get(); + if (bridge == null) { + bridge = new MessageBridge(); + if (!ctx.inMsgBridge.compareAndSet(null, bridge)) { + bridge = ctx.inMsgBridge.get(); + } + } + return bridge.msgBuf; + } + } + ctx = ctx.next; + } } @Override @@ -423,7 +489,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public void fireChannelRegistered() { DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE); if (next != null) { - DefaultChannelPipeline.fireChannelRegistered(next); + EventExecutor executor = next.executor(); + if (executor.inEventLoop()) { + next.fireChannelRegisteredTask.run(); + } else { + executor.execute(next.fireChannelRegisteredTask); + } } } @@ -431,7 +502,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public void fireChannelUnregistered() { DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE); if (next != null) { - DefaultChannelPipeline.fireChannelUnregistered(next); + EventExecutor executor = next.executor(); + if (executor.inEventLoop()) { + next.fireChannelUnregisteredTask.run(); + } else { + executor.execute(next.fireChannelUnregisteredTask); + } } } @@ -439,7 +515,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public void fireChannelActive() { DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE); if (next != null) { - DefaultChannelPipeline.fireChannelActive(next); + EventExecutor executor = next.executor(); + if (executor.inEventLoop()) { + next.fireChannelActiveTask.run(); + } else { + executor.execute(next.fireChannelActiveTask); + } } } @@ -447,25 +528,73 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public void fireChannelInactive() { DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE); if (next != null) { - DefaultChannelPipeline.fireChannelInactive(next); + EventExecutor executor = next.executor(); + if (executor.inEventLoop()) { + next.fireChannelInactiveTask.run(); + } else { + executor.execute(next.fireChannelInactiveTask); + } } } @Override - public void fireExceptionCaught(Throwable cause) { + public void fireExceptionCaught(final Throwable cause) { + if (cause == null) { + throw new NullPointerException("cause"); + } + DefaultChannelHandlerContext next = this.next; if (next != null) { - pipeline.fireExceptionCaught(next, cause); + EventExecutor executor = next.executor(); + if (executor.inEventLoop()) { + try { + next.handler().exceptionCaught(next, cause); + } catch (Throwable t) { + if (logger.isWarnEnabled()) { + logger.warn( + "An exception was thrown by a user handler's " + + "exceptionCaught() method while handling the following exception:", cause); + } + } + } else { + executor.execute(new Runnable() { + @Override + public void run() { + fireExceptionCaught(cause); + } + }); + } } else { - DefaultChannelPipeline.logTerminalException(cause); + logger.warn( + "An exceptionCaught() event was fired, and it reached at the end of the " + + "pipeline. It usually means the last inbound handler in the pipeline did not " + + "handle the exception.", cause); } } @Override - public void fireUserEventTriggered(Object event) { + public void fireUserEventTriggered(final Object event) { + if (event == null) { + throw new NullPointerException("event"); + } + DefaultChannelHandlerContext next = this.next; if (next != null) { - pipeline.fireUserEventTriggered(next, event); + EventExecutor executor = next.executor(); + if (executor.inEventLoop()) { + try { + next.handler().userEventTriggered(next, event); + } catch (Throwable t) { + pipeline.notifyHandlerException(t); + } + } else { + executor.execute(new Runnable() { + @Override + public void run() { + fireUserEventTriggered(event); + } + }); + } } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index f65dcd2c18..a469fd0c5b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -887,7 +887,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NoSuchBufferException( "The first inbound buffer of this channel must be a message buffer."); } - return nextInboundMessageBuffer(head.next); + return head.nextInboundMessageBuffer(); } @Override @@ -896,7 +896,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NoSuchBufferException( "The first inbound buffer of this channel must be a byte buffer."); } - return nextInboundByteBuffer(head.next); + return head.nextInboundByteBuffer(); } @Override @@ -909,79 +909,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { return nextOutboundByteBuffer(tail); } - static boolean hasNextInboundByteBuffer(DefaultChannelHandlerContext ctx) { - for (;;) { - if (ctx == null) { - return false; - } - if (ctx.inByteBridge != null) { - return true; - } - ctx = ctx.next; - } - } - - static boolean hasNextInboundMessageBuffer(DefaultChannelHandlerContext ctx) { - for (;;) { - if (ctx == null) { - return false; - } - if (ctx.inMsgBridge != null) { - return true; - } - ctx = ctx.next; - } - } - - static ChannelBuffer nextInboundByteBuffer(DefaultChannelHandlerContext ctx) { - final Thread currentThread = Thread.currentThread(); - for (;;) { - if (ctx == null) { - throw new NoSuchBufferException(); - } - if (ctx.inByteBuf != null) { - if (ctx.executor().inEventLoop(currentThread)) { - return ctx.inByteBuf; - } else { - StreamBridge bridge = ctx.inByteBridge.get(); - if (bridge == null) { - bridge = new StreamBridge(); - if (!ctx.inByteBridge.compareAndSet(null, bridge)) { - bridge = ctx.inByteBridge.get(); - } - } - return bridge.byteBuf; - } - } - ctx = ctx.next; - } - } - - static Queue nextInboundMessageBuffer(DefaultChannelHandlerContext ctx) { - final Thread currentThread = Thread.currentThread(); - for (;;) { - if (ctx == null) { - throw new NoSuchBufferException(); - } - - if (ctx.inMsgBuf != null) { - if (ctx.executor().inEventLoop(currentThread)) { - return ctx.inMsgBuf; - } else { - MessageBridge bridge = ctx.inMsgBridge.get(); - if (bridge == null) { - bridge = new MessageBridge(); - if (!ctx.inMsgBridge.compareAndSet(null, bridge)) { - bridge = ctx.inMsgBridge.get(); - } - } - return bridge.msgBuf; - } - } - ctx = ctx.next; - } - } - boolean hasNextOutboundByteBuffer(DefaultChannelHandlerContext ctx) { for (;;) { if (ctx == null) { @@ -1060,152 +987,41 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public void fireChannelRegistered() { - DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE); - if (ctx != null) { - fireChannelRegistered(ctx); - } - } - - static void fireChannelRegistered(DefaultChannelHandlerContext ctx) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - ctx.fireChannelRegisteredTask.run(); - } else { - executor.execute(ctx.fireChannelRegisteredTask); - } + head.fireChannelRegistered(); } @Override public void fireChannelUnregistered() { - DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE); - if (ctx != null) { - fireChannelUnregistered(ctx); - } - } - - static void fireChannelUnregistered(DefaultChannelHandlerContext ctx) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - ctx.fireChannelUnregisteredTask.run(); - } else { - executor.execute(ctx.fireChannelUnregisteredTask); - } + head.fireChannelUnregistered(); } @Override public void fireChannelActive() { - DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE); - if (ctx != null) { - firedChannelActive = true; - fireChannelActive(ctx); - if (fireInboundBufferUpdatedOnActivation) { - fireInboundBufferUpdatedOnActivation = false; - fireInboundBufferUpdated(ctx); - } - } - } - - static void fireChannelActive(DefaultChannelHandlerContext ctx) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - ctx.fireChannelActiveTask.run(); - } else { - executor.execute(ctx.fireChannelActiveTask); + firedChannelActive = true; + head.fireChannelActive(); + if (fireInboundBufferUpdatedOnActivation) { + fireInboundBufferUpdatedOnActivation = false; + head.fireInboundBufferUpdated(); } } @Override public void fireChannelInactive() { - DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE); - if (ctx != null) { - // Some implementations such as EmbeddedChannel can trigger inboundBufferUpdated() - // after deactivation, so it's safe not to revert the firedChannelActive flag here. - // Also, all known transports never get re-activated. - //firedChannelActive = false; - fireChannelInactive(ctx); - } - } - - static void fireChannelInactive(DefaultChannelHandlerContext ctx) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - ctx.fireChannelInactiveTask.run(); - } else { - executor.execute(ctx.fireChannelInactiveTask); - } + // Some implementations such as EmbeddedChannel can trigger inboundBufferUpdated() + // after deactivation, so it's safe not to revert the firedChannelActive flag here. + // Also, all known transports never get re-activated. + //firedChannelActive = false; + head.fireChannelInactive(); } @Override public void fireExceptionCaught(Throwable cause) { - DefaultChannelHandlerContext ctx = head.next; - if (ctx != null) { - fireExceptionCaught(ctx, cause); - } else { - logTerminalException(cause); - } - } - - static void logTerminalException(Throwable cause) { - logger.warn( - "An exceptionCaught() event was fired, and it reached at the end of the " + - "pipeline. It usually means the last inbound handler in the pipeline did not " + - "handle the exception.", cause); - } - - void fireExceptionCaught(final DefaultChannelHandlerContext ctx, final Throwable cause) { - if (cause == null) { - throw new NullPointerException("cause"); - } - - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - try { - ((ChannelInboundHandler) ctx.handler()).exceptionCaught(ctx, cause); - } catch (Throwable t) { - if (logger.isWarnEnabled()) { - logger.warn( - "An exception was thrown by a user handler's " + - "exceptionCaught() method while handling the following exception:", cause); - } - } - } else { - executor.execute(new Runnable() { - @Override - public void run() { - fireExceptionCaught(ctx, cause); - } - }); - } + head.fireExceptionCaught(cause); } @Override public void fireUserEventTriggered(Object event) { - DefaultChannelHandlerContext ctx = head.next; - if (ctx != null) { - fireUserEventTriggered(ctx, event); - } - } - - void fireUserEventTriggered(final DefaultChannelHandlerContext ctx, final Object event) { - if (event == null) { - throw new NullPointerException("event"); - } - - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - try { - ((ChannelInboundHandler) ctx.handler()).userEventTriggered(ctx, event); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - executor.execute(new Runnable() { - @Override - public void run() { - fireUserEventTriggered(ctx, event); - } - }); - } + head.fireUserEventTriggered(event); } @Override @@ -1214,19 +1030,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { fireInboundBufferUpdatedOnActivation = true; return; } - DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE); - if (ctx != null) { - fireInboundBufferUpdated(ctx); - } - } - - static void fireInboundBufferUpdated(DefaultChannelHandlerContext ctx) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - ctx.curCtxFireInboundBufferUpdatedTask.run(); - } else { - executor.execute(ctx.curCtxFireInboundBufferUpdatedTask); - } + head.fireInboundBufferUpdated(); } @Override