diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 67bcee7d80..0e9d3dd38d 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -72,7 +72,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private final Channel parent; private final Integer id; private final Unsafe unsafe; - private final ChannelPipeline pipeline; + private final DefaultChannelPipeline pipeline; private final ChannelFuture succeededFuture = new SucceededChannelFuture(this); private final ChannelFuture voidFuture = new VoidChannelFuture(this); private final CloseFuture closeFuture = new CloseFuture(this); @@ -82,7 +82,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private volatile EventLoop eventLoop; private volatile boolean registered; - private final ChannelBufferHolder directOutbound; private ClosedChannelException closedChannelException; private final Deque flushCheckpoints = new ArrayDeque(); private long writeCounter; @@ -123,7 +122,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; this.id = id; unsafe = newUnsafe(); - directOutbound = (ChannelBufferHolder) outboundBuffer; + pipeline = new DefaultChannelPipeline(this); closeFuture().addListener(new ChannelFutureListener() { @Override @@ -132,7 +131,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } }); - pipeline = new DefaultChannelPipeline(this); } @Override @@ -385,7 +383,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public final ChannelBufferHolder directOutbound() { - return directOutbound; + return pipeline.directOutbound; } @Override @@ -628,7 +626,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } inFlushNow = true; - final ChannelBufferHolder out = directOutbound; + final ChannelBufferHolder out = directOutbound(); try { Throwable cause = null; int oldSize = out.size(); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 71c4313ae0..382d6d567b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -5,6 +5,8 @@ import io.netty.util.DefaultAttributeMap; import java.net.SocketAddress; import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicReference; final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelInboundHandlerContext, ChannelOutboundHandlerContext { volatile DefaultChannelHandlerContext next; @@ -17,7 +19,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private final boolean canHandleInbound; private final boolean canHandleOutbound; final ChannelBufferHolder in; - private final ChannelBufferHolder out; + final ChannelBufferHolder out; + + // When the two handlers run in a different thread and they are next to each other, + // each other's buffers can be accessed at the same time resuslting in a race condition. + // To avoid such situation, we lazily creates an additional thread-safe buffer called + // 'bridge' so that the two handlers access each other's buffer only via the bridges. + // The content written into a bridge is flushed into the actual buffer by flushBridge(). + final AtomicReference> inMsgBridge; + final AtomicReference> outMsgBridge; + final AtomicReference inByteBridge; + final AtomicReference outByteBridge; // Runnables that calls handlers final Runnable fireChannelRegisteredTask = new Runnable() { @@ -73,6 +85,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @SuppressWarnings("unchecked") public void run() { DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this; + flushBridge(); try { ((ChannelInboundHandler) ctx.handler()).inboundBufferUpdated(ctx); } catch (Throwable t) { @@ -135,8 +148,23 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } catch (Exception e) { throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e); } + + if (!in.isBypass()) { + if (in.hasByteBuffer()) { + inByteBridge = new AtomicReference(); + inMsgBridge = null; + } else { + inByteBridge = null; + inMsgBridge = new AtomicReference>(); + } + } else { + inByteBridge = null; + inMsgBridge = null; + } } else { in = null; + inByteBridge = null; + inMsgBridge = null; } if (canHandleOutbound) { try { @@ -148,8 +176,38 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements // TODO Release the inbound buffer once pooling is implemented. } } + + if (!out.isBypass()) { + if (out.hasByteBuffer()) { + outByteBridge = new AtomicReference(); + outMsgBridge = null; + } else { + outByteBridge = null; + outMsgBridge = new AtomicReference>(); + } + } else { + outByteBridge = null; + outMsgBridge = null; + } } else { out = null; + outByteBridge = null; + outMsgBridge = null; + } + } + + void flushBridge() { + if (inMsgBridge != null) { + BlockingQueue bridge = inMsgBridge.get(); + if (bridge != null) { + bridge.drainTo(in.messageBuffer()); + } + } + if (outMsgBridge != null) { + BlockingQueue bridge = outMsgBridge.get(); + if (bridge != null) { + bridge.drainTo(out.messageBuffer()); + } } } @@ -229,7 +287,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public Queue nextInboundMessageBuffer() { - return DefaultChannelPipeline.nextInboundMessageBuffer(next); + return DefaultChannelPipeline.nextInboundMessageBuffer(executor(), next); } @Override @@ -239,7 +297,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public Queue nextOutboundMessageBuffer() { - return pipeline.nextOutboundMessageBuffer(prev); + return pipeline.nextOutboundMessageBuffer(executor(), prev); } @Override diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index af9944974e..76dc026a70 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -18,6 +18,7 @@ package io.netty.channel; import io.netty.buffer.ChannelBuffer; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; +import io.netty.util.internal.QueueFactory; import java.net.SocketAddress; import java.util.ArrayList; @@ -28,6 +29,9 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; /** * The default {@link ChannelPipeline} implementation. It is usually created @@ -39,9 +43,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { final Channel channel; private final Channel.Unsafe unsafe; - private final ChannelBufferHolder directOutbound; + final ChannelBufferHolder directOutbound; - private volatile DefaultChannelHandlerContext head; + private final DefaultChannelHandlerContext head; private volatile DefaultChannelHandlerContext tail; private final Map name2ctx = new HashMap(4); @@ -56,8 +60,14 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NullPointerException("channel"); } this.channel = channel; + + HeadHandler headHandler = new HeadHandler(); + head = new DefaultChannelHandlerContext( + this, null, null, null, generateName(headHandler), headHandler); + tail = head; + + directOutbound = head.out; unsafe = channel.unsafe(); - directOutbound = unsafe.directOutbound(); } @Override @@ -72,23 +82,20 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public synchronized ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler) { - if (name2ctx.isEmpty()) { - init(executor, name, handler); - } else { - checkDuplicateName(name); - DefaultChannelHandlerContext oldHead = head; - DefaultChannelHandlerContext newHead = - new DefaultChannelHandlerContext(this, executor, null, oldHead, name, handler); + checkDuplicateName(name); + DefaultChannelHandlerContext nextCtx = head.next; + DefaultChannelHandlerContext newCtx = + new DefaultChannelHandlerContext(this, executor, head, nextCtx, name, handler); - callBeforeAdd(newHead); + callBeforeAdd(newCtx); - oldHead.prev = newHead; - head = newHead; - name2ctx.put(name, newHead); - - callAfterAdd(newHead); + if (nextCtx != null) { + nextCtx.prev = newCtx; } + head.next = newCtx; + name2ctx.put(name, newCtx); + callAfterAdd(newCtx); return this; } @@ -99,23 +106,18 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public synchronized ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler) { - if (name2ctx.isEmpty()) { - init(executor, name, handler); - } else { - checkDuplicateName(name); - DefaultChannelHandlerContext oldTail = tail; - DefaultChannelHandlerContext newTail = - new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler); + checkDuplicateName(name); + DefaultChannelHandlerContext oldTail = tail; + DefaultChannelHandlerContext newTail = + new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler); - callBeforeAdd(newTail); + callBeforeAdd(newTail); - oldTail.next = newTail; - tail = newTail; - name2ctx.put(name, newTail); - - callAfterAdd(newTail); - } + oldTail.next = newTail; + tail = newTail; + name2ctx.put(name, newTail); + callAfterAdd(newTail); return this; } @@ -127,22 +129,17 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public synchronized ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler) { DefaultChannelHandlerContext ctx = getContextOrDie(baseName); - if (ctx == head) { - addFirst(name, handler); - } else { - checkDuplicateName(name); - DefaultChannelHandlerContext newCtx = - new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler); + checkDuplicateName(name); + DefaultChannelHandlerContext newCtx = + new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler); - callBeforeAdd(newCtx); + callBeforeAdd(newCtx); - ctx.prev.next = newCtx; - ctx.prev = newCtx; - name2ctx.put(name, newCtx); - - callAfterAdd(newCtx); - } + ctx.prev.next = newCtx; + ctx.prev = newCtx; + name2ctx.put(name, newCtx); + callAfterAdd(newCtx); return this; } @@ -250,10 +247,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { private DefaultChannelHandlerContext remove(DefaultChannelHandlerContext ctx) { if (head == tail) { - head = tail = null; - name2ctx.clear(); + return null; } else if (ctx == head) { - removeFirst(); + throw new Error(); // Should never happen. } else if (ctx == tail) { removeLast(); } else { @@ -272,55 +268,26 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public synchronized ChannelHandler removeFirst() { - if (name2ctx.isEmpty()) { + if (head == tail) { throw new NoSuchElementException(); } - - DefaultChannelHandlerContext oldHead = head; - if (oldHead == null) { - throw new NoSuchElementException(); - } - - callBeforeRemove(oldHead); - - if (oldHead.next == null) { - head = tail = null; - name2ctx.clear(); - } else { - oldHead.next.prev = null; - head = oldHead.next; - name2ctx.remove(oldHead.name()); - } - - callAfterRemove(oldHead); - - return oldHead.handler(); + return remove(head.next).handler(); } @Override public synchronized ChannelHandler removeLast() { - if (name2ctx.isEmpty()) { + if (head == tail) { throw new NoSuchElementException(); } DefaultChannelHandlerContext oldTail = tail; - if (oldTail == null) { - throw new NoSuchElementException(); - } - callBeforeRemove(oldTail); - if (oldTail.prev == null) { - head = tail = null; - name2ctx.clear(); - } else { - oldTail.prev.next = null; - tail = oldTail.prev; - name2ctx.remove(oldTail.name()); - } + oldTail.prev.next = null; + tail = oldTail.prev; + name2ctx.remove(oldTail.name()); callBeforeRemove(oldTail); - return oldTail.handler(); } @@ -343,8 +310,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private ChannelHandler replace(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler) { if (ctx == head) { - removeFirst(); - addFirst(newName, newHandler); + throw new IllegalArgumentException(); } else if (ctx == tail) { removeLast(); addLast(newName, newHandler); @@ -472,20 +438,20 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public synchronized ChannelHandler first() { - DefaultChannelHandlerContext head = this.head; - if (head == null) { + DefaultChannelHandlerContext first = head.next; + if (first == null) { return null; } - return head.handler(); + return first.handler(); } @Override public synchronized ChannelHandler last() { - DefaultChannelHandlerContext tail = this.tail; - if (tail == null) { + DefaultChannelHandlerContext last = tail; + if (last == head || last == null) { return null; } - return tail.handler(); + return last.handler(); } @Override @@ -548,7 +514,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (name2ctx.isEmpty()) { return null; } - DefaultChannelHandlerContext ctx = head; + DefaultChannelHandlerContext ctx = head.next; for (;;) { if (handlerType.isAssignableFrom(ctx.handler().getClass())) { return ctx; @@ -569,7 +535,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { return list; } - DefaultChannelHandlerContext ctx = head; + DefaultChannelHandlerContext ctx = head.next; for (;;) { list.add(ctx.name()); ctx = ctx.next; @@ -587,7 +553,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { return map; } - DefaultChannelHandlerContext ctx = head; + DefaultChannelHandlerContext ctx = head.next; for (;;) { map.put(ctx.name(), ctx.handler()); ctx = ctx.next; @@ -606,7 +572,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { StringBuilder buf = new StringBuilder(); buf.append(getClass().getSimpleName()); buf.append('{'); - DefaultChannelHandlerContext ctx = head; + DefaultChannelHandlerContext ctx = head.next; for (;;) { buf.append('('); buf.append(ctx.name()); @@ -629,7 +595,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NoSuchBufferException( "The first inbound buffer of this channel must be a message buffer."); } - return nextInboundMessageBuffer(head); + return nextInboundMessageBuffer(null, head.next); } @Override @@ -638,12 +604,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NoSuchBufferException( "The first inbound buffer of this channel must be a byte buffer."); } - return nextInboundByteBuffer(head); + return nextInboundByteBuffer(head.next); } @Override public Queue outboundMessageBuffer() { - return nextOutboundMessageBuffer(tail); + return nextOutboundMessageBuffer(null, tail); } @Override @@ -690,14 +656,27 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - static Queue nextInboundMessageBuffer(DefaultChannelHandlerContext ctx) { + static Queue nextInboundMessageBuffer( + EventExecutor currentExecutor, DefaultChannelHandlerContext ctx) { for (;;) { if (ctx == null) { throw new NoSuchBufferException(); } - ChannelBufferHolder in = ctx.inbound(); - if (in != null && !in.isBypass() && in.hasMessageBuffer()) { - return in.messageBuffer(); + + final AtomicReference> inMsgBridge = ctx.inMsgBridge; + if (inMsgBridge != null) { + if (currentExecutor == ctx.executor()) { + return ctx.in.messageBuffer(); + } else { + BlockingQueue queue = inMsgBridge.get(); + if (queue == null) { + queue = QueueFactory.createQueue(); + if (!inMsgBridge.compareAndSet(null, queue)) { + queue = inMsgBridge.get(); + } + } + return queue; + } } ctx = ctx.next; } @@ -706,11 +685,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { boolean hasNextOutboundByteBuffer(DefaultChannelHandlerContext ctx) { for (;;) { if (ctx == null) { - if (directOutbound.hasByteBuffer()) { - return true; - } else { - return false; - } + return false; } ChannelBufferHolder out = ctx.outbound(); @@ -724,11 +699,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { boolean hasNextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) { for (;;) { if (ctx == null) { - if (directOutbound.hasMessageBuffer()) { - return true; - } else { - return false; - } + return false; } ChannelBufferHolder out = ctx.outbound(); @@ -742,11 +713,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { ChannelBuffer nextOutboundByteBuffer(DefaultChannelHandlerContext ctx) { for (;;) { if (ctx == null) { - if (directOutbound.hasByteBuffer()) { - return directOutbound.byteBuffer(); - } else { - throw new NoSuchBufferException(); - } + throw new NoSuchBufferException(); } ChannelBufferHolder out = ctx.outbound(); @@ -757,20 +724,28 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - Queue nextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) { + Queue nextOutboundMessageBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) { for (;;) { if (ctx == null) { - if (directOutbound.hasMessageBuffer()) { - return directOutbound.messageBuffer(); + throw new NoSuchBufferException(); + } + + final AtomicReference> outMsgBridge = ctx.outMsgBridge; + if (outMsgBridge != null) { + if (currentExecutor == ctx.executor()) { + return ctx.out.messageBuffer(); } else { - throw new NoSuchBufferException(); + BlockingQueue queue = outMsgBridge.get(); + if (queue == null) { + queue = QueueFactory.createQueue(); + if (!outMsgBridge.compareAndSet(null, queue)) { + queue = outMsgBridge.get(); + } + } + return queue; } } - ChannelBufferHolder out = ctx.outbound(); - if (out != null && !out.isBypass() && out.hasMessageBuffer()) { - return out.messageBuffer(); - } ctx = ctx.prev; } } @@ -998,24 +973,20 @@ public class DefaultChannelPipeline implements ChannelPipeline { } validateFuture(future); - if (ctx != null) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - try { - ((ChannelOutboundHandler) ctx.handler()).bind(ctx, localAddress, future); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - executor.execute(new Runnable() { - @Override - public void run() { - bind(ctx, localAddress, future); - } - }); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).bind(ctx, localAddress, future); + } catch (Throwable t) { + notifyHandlerException(t); } } else { - unsafe.bind(localAddress, future); + executor.execute(new Runnable() { + @Override + public void run() { + bind(ctx, localAddress, future); + } + }); } return future; } @@ -1036,24 +1007,20 @@ public class DefaultChannelPipeline implements ChannelPipeline { } validateFuture(future); - if (ctx != null) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - try { - ((ChannelOutboundHandler) ctx.handler()).connect(ctx, remoteAddress, localAddress, future); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - executor.execute(new Runnable() { - @Override - public void run() { - connect(ctx, remoteAddress, localAddress, future); - } - }); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).connect(ctx, remoteAddress, localAddress, future); + } catch (Throwable t) { + notifyHandlerException(t); } } else { - unsafe.connect(remoteAddress, localAddress, future); + executor.execute(new Runnable() { + @Override + public void run() { + connect(ctx, remoteAddress, localAddress, future); + } + }); } return future; @@ -1066,24 +1033,20 @@ public class DefaultChannelPipeline implements ChannelPipeline { ChannelFuture disconnect(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { validateFuture(future); - if (ctx != null) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - try { - ((ChannelOutboundHandler) ctx.handler()).disconnect(ctx, future); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - executor.execute(new Runnable() { - @Override - public void run() { - disconnect(ctx, future); - } - }); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).disconnect(ctx, future); + } catch (Throwable t) { + notifyHandlerException(t); } } else { - unsafe.disconnect(future); + executor.execute(new Runnable() { + @Override + public void run() { + disconnect(ctx, future); + } + }); } return future; @@ -1096,24 +1059,20 @@ public class DefaultChannelPipeline implements ChannelPipeline { ChannelFuture close(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { validateFuture(future); - if (ctx != null) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - try { - ((ChannelOutboundHandler) ctx.handler()).close(ctx, future); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - executor.execute(new Runnable() { - @Override - public void run() { - close(ctx, future); - } - }); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).close(ctx, future); + } catch (Throwable t) { + notifyHandlerException(t); } } else { - unsafe.close(future); + executor.execute(new Runnable() { + @Override + public void run() { + close(ctx, future); + } + }); } return future; @@ -1126,24 +1085,20 @@ public class DefaultChannelPipeline implements ChannelPipeline { ChannelFuture deregister(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { validateFuture(future); - if (ctx != null) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - try { - ((ChannelOutboundHandler) ctx.handler()).deregister(ctx, future); - } catch (Throwable t) { - notifyHandlerException(t); - } - } else { - executor.execute(new Runnable() { - @Override - public void run() { - deregister(ctx, future); - } - }); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + try { + ((ChannelOutboundHandler) ctx.handler()).deregister(ctx, future); + } catch (Throwable t) { + notifyHandlerException(t); } } else { - unsafe.deregister(future); + executor.execute(new Runnable() { + @Override + public void run() { + deregister(ctx, future); + } + }); } return future; @@ -1156,20 +1111,16 @@ public class DefaultChannelPipeline implements ChannelPipeline { ChannelFuture flush(final DefaultChannelHandlerContext ctx, final ChannelFuture future) { validateFuture(future); - if (ctx != null) { - EventExecutor executor = ctx.executor(); - if (executor.inEventLoop()) { - flush0(ctx, future); - } else { - executor.execute(new Runnable() { - @Override - public void run() { - flush(ctx, future); - } - }); - } + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + flush0(ctx, future); } else { - unsafe.flush(future); + executor.execute(new Runnable() { + @Override + public void run() { + flush(ctx, future); + } + }); } return future; @@ -1177,6 +1128,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private void flush0(final DefaultChannelHandlerContext ctx, ChannelFuture future) { try { + ctx.flushBridge(); ((ChannelOutboundHandler) ctx.handler()).flush(ctx, future); } catch (Throwable t) { notifyHandlerException(t); @@ -1204,16 +1156,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { boolean msgBuf = false; for (;;) { if (ctx == null) { - executor = channel.eventLoop(); - out = directOutbound; - if (out.hasByteBuffer()) { - if(!(message instanceof ChannelBuffer)) { - throw new NoSuchBufferException(); - } - } else { - msgBuf = true; - } - break; + throw new NoSuchBufferException(); } if (ctx.canHandleOutbound()) { @@ -1238,11 +1181,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { ChannelBuffer buf = (ChannelBuffer) message; out.byteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes()); } - if (ctx != null) { - flush0(ctx, future); - } else { - unsafe.flush(future); - } + flush0(ctx, future); return future; } else { final DefaultChannelHandlerContext ctx0 = ctx; @@ -1274,7 +1213,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } private DefaultChannelHandlerContext firstInboundContext() { - return nextInboundContext(head); + return nextInboundContext(head.next); } private DefaultChannelHandlerContext firstOutboundContext() { @@ -1347,16 +1286,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { return inExceptionCaught(cause.getCause()); } - private void init(EventExecutor executor, String name, ChannelHandler handler) { - DefaultChannelHandlerContext ctx = - new DefaultChannelHandlerContext(this, executor, null, null, name, handler); - callBeforeAdd(ctx); - head = tail = ctx; - name2ctx.clear(); - name2ctx.put(name, ctx); - callAfterAdd(ctx); - } - private void checkDuplicateName(String name) { if (name2ctx.containsKey(name)) { throw new IllegalArgumentException("Duplicate handler name: " + name); @@ -1365,7 +1294,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private DefaultChannelHandlerContext getContextOrDie(String name) { DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(name); - if (ctx == null) { + if (ctx == null || ctx == head) { throw new NoSuchElementException(name); } else { return ctx; @@ -1374,7 +1303,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) { DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handler); - if (ctx == null) { + if (ctx == null || ctx == head) { throw new NoSuchElementException(handler.getClass().getName()); } else { return ctx; @@ -1383,10 +1312,84 @@ public class DefaultChannelPipeline implements ChannelPipeline { private DefaultChannelHandlerContext getContextOrDie(Class handlerType) { DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handlerType); - if (ctx == null) { + if (ctx == null || ctx == head) { throw new NoSuchElementException(handlerType.getName()); } else { return ctx; } } + + @SuppressWarnings("rawtypes") + private final class HeadHandler implements ChannelOutboundHandler { + @Override + public ChannelBufferHolder newOutboundBuffer( + ChannelOutboundHandlerContext ctx) throws Exception { + switch (channel.type()) { + case STREAM: + return ChannelBufferHolders.byteBuffer(); + case MESSAGE: + return ChannelBufferHolders.messageBuffer(); + default: + throw new Error(); + } + } + + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void bind(ChannelOutboundHandlerContext ctx, + SocketAddress localAddress, ChannelFuture future) + throws Exception { + unsafe.bind(localAddress, future); + } + + @Override + public void connect(ChannelOutboundHandlerContext ctx, + SocketAddress remoteAddress, SocketAddress localAddress, + ChannelFuture future) throws Exception { + unsafe.connect(remoteAddress, localAddress, future); + } + + @Override + public void disconnect(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + unsafe.disconnect(future); + } + + @Override + public void close(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + unsafe.close(future); + } + + @Override + public void deregister(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + unsafe.deregister(future); + } + + @Override + public void flush(ChannelOutboundHandlerContext ctx, + ChannelFuture future) throws Exception { + unsafe.flush(future); + } + } }