diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index ce5cd83d51..0f2399b04a 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -19,8 +19,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements EventExecutor executor; // not thread-safe but OK because it never changes once set. private final String name; private final ChannelHandler handler; - private final boolean canHandleInbound; - private final boolean canHandleOutbound; final ChannelBufferHolder in; final ChannelBufferHolder out; @@ -41,7 +39,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public void run() { DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; try { - ((ChannelInboundHandler) ctx.handler()).channelRegistered(ctx); + ((ChannelInboundHandler) ctx.handler).channelRegistered(ctx); } catch (Throwable t) { pipeline.notifyHandlerException(t); } @@ -53,7 +51,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public void run() { DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; try { - ((ChannelInboundHandler) ctx.handler()).channelUnregistered(ctx); + ((ChannelInboundHandler) ctx.handler).channelUnregistered(ctx); } catch (Throwable t) { pipeline.notifyHandlerException(t); } @@ -65,7 +63,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public void run() { DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; try { - ((ChannelInboundHandler) ctx.handler()).channelActive(ctx); + ((ChannelInboundHandler) ctx.handler).channelActive(ctx); } catch (Throwable t) { pipeline.notifyHandlerException(t); } @@ -77,7 +75,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements public void run() { DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; try { - ((ChannelInboundHandler) ctx.handler()).channelInactive(ctx); + ((ChannelInboundHandler) ctx.handler).channelInactive(ctx); } catch (Throwable t) { pipeline.notifyHandlerException(t); } @@ -90,13 +88,15 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; flushBridge(); try { - ((ChannelInboundHandler) ctx.handler()).inboundBufferUpdated(ctx); + ((ChannelInboundHandler) ctx.handler).inboundBufferUpdated(ctx); } catch (Throwable t) { pipeline.notifyHandlerException(t); } finally { - ChannelBufferHolder inbound = ctx.inbound(); - if (!inbound.isBypass() && inbound.isEmpty() && inbound.hasByteBuffer()) { - inbound.byteBuffer().discardReadBytes(); + if (inByteBridge != null) { + ChannelBuffer buf = ctx.in.byteBuffer(); + if (!buf.readable()) { + buf.discardReadBytes(); + } } } } @@ -125,9 +125,9 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements if (handler == null) { throw new NullPointerException("handler"); } - canHandleInbound = handler instanceof ChannelInboundHandler; - canHandleOutbound = handler instanceof ChannelOutboundHandler; + boolean canHandleInbound = handler instanceof ChannelInboundHandler; + boolean canHandleOutbound = handler instanceof ChannelOutboundHandler; if (!canHandleInbound && !canHandleOutbound) { throw new IllegalArgumentException( "handler must be either " + @@ -293,12 +293,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public boolean canHandleInbound() { - return canHandleInbound; + return in != null; } @Override public boolean canHandleOutbound() { - return canHandleOutbound; + return out != null; } @Override diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 36952c55a9..b130e620ae 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -49,8 +49,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { private final Map name2ctx = new HashMap(4); private boolean firedChannelActive; - private boolean fireInboundBufferUpdatedOnActivation; - + private boolean fireInboundBufferUpdatedOnActivation; + final Map childExecutors = new IdentityHashMap(); @@ -87,27 +87,25 @@ public class DefaultChannelPipeline implements ChannelPipeline { new DefaultChannelHandlerContext(this, executor, head, nextCtx, name, handler); if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - addFirst0(name, handler, nextCtx, newCtx); + addFirst0(name, nextCtx, newCtx); } else { try { newCtx.executor().submit(new Runnable() { - @Override public void run() { checkDuplicateName(name); - - addFirst0(name, handler, nextCtx, newCtx); + addFirst0(name, nextCtx, newCtx); } }).get(); } catch (Throwable t) { throw new ChannelException(t); } - + } - + return this; } - private void addFirst0(final String name, ChannelHandler handler, DefaultChannelHandlerContext nextCtx, DefaultChannelHandlerContext newCtx) { + private void addFirst0(final String name, DefaultChannelHandlerContext nextCtx, DefaultChannelHandlerContext newCtx) { callBeforeAdd(newCtx); if (nextCtx != null) { @@ -118,7 +116,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { callAfterAdd(newCtx); } - + @Override public ChannelPipeline addLast(String name, ChannelHandler handler) { return addLast(null, name, handler); @@ -131,36 +129,34 @@ public class DefaultChannelPipeline implements ChannelPipeline { final DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler); - + if (!newTail.channel().isRegistered() || newTail.executor().inEventLoop()) { - addLast0(name, handler, oldTail, newTail); + addLast0(name, oldTail, newTail); } else { try { newTail.executor().submit(new Runnable() { - @Override public void run() { checkDuplicateName(name); - - addLast0(name, handler, oldTail, newTail); + addLast0(name, oldTail, newTail); } }).get(); } catch (Throwable t) { throw new ChannelException(t); } - + } return this; } - - private void addLast0(final String name, ChannelHandler handler, DefaultChannelHandlerContext oldTail, DefaultChannelHandlerContext newTail) { + + private void addLast0(final String name, DefaultChannelHandlerContext oldTail, DefaultChannelHandlerContext newTail) { callBeforeAdd(newTail); oldTail.next = newTail; tail = newTail; name2ctx.put(name, newTail); - callAfterAdd(newTail); + callAfterAdd(newTail); } @Override @@ -176,16 +172,14 @@ public class DefaultChannelPipeline implements ChannelPipeline { new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler); if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - addBefore0(name, handler, ctx, newCtx); + addBefore0(name, ctx, newCtx); } else { try { newCtx.executor().submit(new Runnable() { - @Override public void run() { checkDuplicateName(name); - - addBefore0(name, handler, ctx, newCtx); + addBefore0(name, ctx, newCtx); } }).get(); } catch (Throwable t) { @@ -195,15 +189,16 @@ public class DefaultChannelPipeline implements ChannelPipeline { return this; } - private void addBefore0(final String name, ChannelHandler handler, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) { + private void addBefore0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) { callBeforeAdd(newCtx); ctx.prev.next = newCtx; ctx.prev = newCtx; name2ctx.put(name, newCtx); - callAfterAdd(newCtx); + callAfterAdd(newCtx); } + @Override public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { return addAfter(null, baseName, name, handler); @@ -218,31 +213,29 @@ public class DefaultChannelPipeline implements ChannelPipeline { checkDuplicateName(name); final DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, executor, ctx, ctx.next, name, handler); - + if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - addAfter0(name, handler, ctx, newCtx); + addAfter0(name, ctx, newCtx); } else { try { newCtx.executor().submit(new Runnable() { - @Override public void run() { checkDuplicateName(name); - - addAfter0(name, handler, ctx, newCtx); + addAfter0(name, ctx, newCtx); } }).get(); } catch (Throwable t) { throw new ChannelException(t); } } - + } return this; } - private void addAfter0(final String name, ChannelHandler handler, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) { + private void addAfter0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) { checkDuplicateName(name); callBeforeAdd(newCtx); @@ -251,9 +244,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { ctx.next = newCtx; name2ctx.put(name, newCtx); - callAfterAdd(newCtx); + callAfterAdd(newCtx); } - + @Override public ChannelPipeline addFirst(ChannelHandler... handlers) { return addFirst(null, handlers); @@ -342,7 +335,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } else { try { ctx.executor().submit(new Runnable() { - + @Override public void run() { remove0(ctx); @@ -353,11 +346,11 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new ChannelException(t); } } - + } return ctx; } - + private void remove0(DefaultChannelHandlerContext ctx) { callBeforeRemove(ctx); @@ -390,7 +383,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } else { try { oldTail.executor().submit(new Runnable() { - + @Override public void run() { removeLast0(oldTail); @@ -400,12 +393,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { } catch (Throwable t) { throw new ChannelException(t); } - + } - + return oldTail.handler(); } - + private void removeLast0(DefaultChannelHandlerContext oldTail) { callBeforeRemove(oldTail); @@ -451,14 +444,13 @@ public class DefaultChannelPipeline implements ChannelPipeline { new DefaultChannelHandlerContext(this, ctx.executor, prev, next, newName, newHandler); if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - replace0(ctx, newName, newHandler, newCtx); + replace0(ctx, newName, newCtx); } else { try { newCtx.executor().submit(new Runnable() { - @Override public void run() { - replace0(ctx, newName, newHandler, newCtx); + replace0(ctx, newName, newCtx); } }).get(); @@ -466,18 +458,17 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new ChannelException(t); } } - } return ctx.handler(); } - private void replace0(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler, DefaultChannelHandlerContext newCtx) { + private void replace0(DefaultChannelHandlerContext ctx, String newName, DefaultChannelHandlerContext newCtx) { boolean sameName = ctx.name().equals(newName); DefaultChannelHandlerContext prev = ctx.prev; DefaultChannelHandlerContext next = ctx.next; - + callBeforeRemove(ctx); callBeforeAdd(newCtx); @@ -520,6 +511,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw addException; } } + private static void callBeforeAdd(ChannelHandlerContext ctx) { ChannelHandler handler = ctx.handler(); if (handler instanceof AbstractChannelHandler) { @@ -809,9 +801,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { return bridge.byteBuf; } } - ChannelBufferHolder in = ctx.in; - if (in != null && !in.isBypass() && in.hasByteBuffer()) { - return in.byteBuffer(); + if (ctx.inByteBridge != null) { + return ctx.in.byteBuffer(); } ctx = ctx.next; } @@ -1298,9 +1289,11 @@ public class DefaultChannelPipeline implements ChannelPipeline { } catch (Throwable t) { notifyHandlerException(t); } finally { - ChannelBufferHolder outbound = ctx.outbound(); - if (!outbound.isBypass() && outbound.isEmpty() && outbound.hasByteBuffer()) { - outbound.byteBuffer().discardReadBytes(); + if (ctx.outByteBridge != null) { + ChannelBuffer buf = ctx.out.byteBuffer(); + if (!buf.readable()) { + buf.discardReadBytes(); + } } } }