diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index b8dbfc0706..5d52be18f8 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -72,7 +72,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private final Channel parent; private final Integer id; private final Unsafe unsafe; - private final ChannelPipeline pipeline = new DefaultChannelPipeline(this); + private final ChannelPipeline pipeline; private final ChannelFuture succeededFuture = new SucceededChannelFuture(this); private final ChannelFuture voidFuture = new VoidChannelFuture(this); private final CloseFuture closeFuture = new CloseFuture(this); @@ -131,6 +131,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha allChannels.remove(id()); } }); + + pipeline = new DefaultChannelPipeline(this); } @Override @@ -150,6 +152,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public EventLoop eventLoop() { + EventLoop eventLoop = this.eventLoop; if (eventLoop == null) { throw new IllegalStateException("channel not registered to an event loop"); } @@ -581,13 +584,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - // Attempt/perform outbound I/O if: - // - the channel is inactive - flush0() will fail the futures. - // - the event loop has no plan to call flushForcibly(). - if (!inFlushNow) { + if (!inFlushNow) { // Avoid re-entrance try { - if (!isActive() || !isFlushPending()) { + if (!isFlushPending()) { flushNow(); + } else { + // Event loop will call flushNow() later by itself. } } catch (Throwable t) { notifyFlushFutures(t); @@ -647,10 +649,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha close(voidFuture()); } } - - if (!isActive()) { - close(unsafe().voidFuture()); - } } finally { inFlushNow = false; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 3567eeb910..59ae0dbef0 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -9,6 +9,7 @@ import java.util.Queue; final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelInboundHandlerContext, ChannelOutboundHandlerContext { volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext prev; + private final Channel channel; private final DefaultChannelPipeline pipeline; final EventExecutor executor; private final String name; @@ -110,6 +111,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements this.prev = prev; this.next = next; + channel = pipeline.channel; this.pipeline = pipeline; this.name = name; this.handler = handler; @@ -153,7 +155,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public Channel channel() { - return pipeline.channel; + return channel; } @Override @@ -164,7 +166,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public EventExecutor executor() { if (executor == null) { - return channel().eventLoop(); + return channel.eventLoop(); } else { return executor; } @@ -360,16 +362,16 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public ChannelFuture newFuture() { - return channel().newFuture(); + return channel.newFuture(); } @Override public ChannelFuture newSucceededFuture() { - return channel().newSucceededFuture(); + return channel.newSucceededFuture(); } @Override public ChannelFuture newFailedFuture(Throwable cause) { - return channel().newFailedFuture(cause); + return channel.newFailedFuture(cause); } } \ No newline at end of file diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 2c6cb3431a..8d22b09969 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -38,6 +38,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); final Channel channel; + private final Channel.Unsafe unsafe; + private final ChannelBufferHolder directOutbound; + private volatile DefaultChannelHandlerContext head; private volatile DefaultChannelHandlerContext tail; private final Map name2ctx = @@ -53,6 +56,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NullPointerException("channel"); } this.channel = channel; + unsafe = channel.unsafe(); + directOutbound = unsafe.directOutbound(); } @Override @@ -667,9 +672,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { ChannelBuffer nextOutboundByteBuffer(DefaultChannelHandlerContext ctx) { for (;;) { if (ctx == null) { - ChannelBufferHolder lastOut = channel().unsafe().directOutbound(); - if (lastOut.hasByteBuffer()) { - return lastOut.byteBuffer(); + if (directOutbound.hasByteBuffer()) { + return directOutbound.byteBuffer(); } else { throw NoSuchBufferException.INSTANCE; } @@ -686,9 +690,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { Queue nextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) { for (;;) { if (ctx == null) { - ChannelBufferHolder lastOut = channel().unsafe().directOutbound(); - if (lastOut.hasMessageBuffer()) { - return lastOut.messageBuffer(); + if (directOutbound.hasMessageBuffer()) { + return directOutbound.messageBuffer(); } else { throw NoSuchBufferException.INSTANCE; } @@ -942,7 +945,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { }); } } else { - channel().unsafe().bind(localAddress, future); + unsafe.bind(localAddress, future); } return future; } @@ -980,7 +983,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { }); } } else { - channel().unsafe().connect(remoteAddress, localAddress, future); + unsafe.connect(remoteAddress, localAddress, future); } return future; @@ -1010,7 +1013,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { }); } } else { - channel().unsafe().disconnect(future); + unsafe.disconnect(future); } return future; @@ -1040,7 +1043,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { }); } } else { - channel().unsafe().close(future); + unsafe.close(future); } return future; @@ -1070,7 +1073,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { }); } } else { - channel().unsafe().deregister(future); + unsafe.deregister(future); } return future; @@ -1086,16 +1089,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (ctx != null) { EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { - try { - ((ChannelOutboundHandler) ctx.handler()).flush(ctx, future); - } catch (Throwable t) { - notifyHandlerException(t); - } finally { - ChannelBufferHolder outbound = ctx.outbound(); - if (!outbound.isBypass() && outbound.isEmpty() && outbound.hasByteBuffer()) { - outbound.byteBuffer().discardReadBytes(); - } - } + flush0(ctx, future); } else { executor.execute(new Runnable() { @Override @@ -1105,12 +1099,25 @@ public class DefaultChannelPipeline implements ChannelPipeline { }); } } else { - channel().unsafe().flush(future); + unsafe.flush(future); } return future; } + private void flush0(final DefaultChannelHandlerContext ctx, ChannelFuture future) { + try { + ((ChannelOutboundHandler) ctx.handler()).flush(ctx, future); + } catch (Throwable t) { + notifyHandlerException(t); + } finally { + ChannelBufferHolder outbound = ctx.outbound(); + if (!outbound.isBypass() && outbound.isEmpty() && outbound.hasByteBuffer()) { + outbound.byteBuffer().discardReadBytes(); + } + } + } + @Override public ChannelFuture write(Object message, ChannelFuture future) { return write(firstOutboundContext(), message, future); @@ -1129,7 +1136,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { out = ctx.outbound(); } else { executor = channel().eventLoop(); - out = channel().unsafe().directOutbound(); + out = directOutbound; } if (executor.inEventLoop()) { @@ -1143,7 +1150,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { "cannot write a message whose type is not " + ChannelBuffer.class.getSimpleName() + ": " + message.getClass().getName()); } - return flush(ctx, future); + if (ctx != null) { + flush0(ctx, future); + } else { + unsafe.flush(future); + } + return future; } else { executor.execute(new Runnable() { @Override