Simplified DefaultChannelPipeline by making its list head final

- Previously, head was a volatile field which is null at the beginning.
  While iterating over the pipeline, if the loop hits null, it called
  Channel.Unsafe explicitly.
- Instead, I created an outbound handler that redirects all requests
  to the unsafe and made it a final field of the pipeline.
- As a result, DefaultChannelPipeline code became much simpler.
This commit is contained in:
Trustin Lee 2012-06-03 18:51:42 -07:00
parent f6e14b636f
commit f3734e1eb9
3 changed files with 313 additions and 254 deletions

View File

@ -72,7 +72,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final Channel parent;
private final Integer id;
private final Unsafe unsafe;
private final ChannelPipeline pipeline;
private final DefaultChannelPipeline pipeline;
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
private final ChannelFuture voidFuture = new VoidChannelFuture(this);
private final CloseFuture closeFuture = new CloseFuture(this);
@ -82,7 +82,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private volatile EventLoop eventLoop;
private volatile boolean registered;
private final ChannelBufferHolder<Object> directOutbound;
private ClosedChannelException closedChannelException;
private final Deque<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
private long writeCounter;
@ -123,7 +122,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
directOutbound = (ChannelBufferHolder<Object>) outboundBuffer;
pipeline = new DefaultChannelPipeline(this);
closeFuture().addListener(new ChannelFutureListener() {
@Override
@ -132,7 +131,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
});
pipeline = new DefaultChannelPipeline(this);
}
@Override
@ -385,7 +383,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public final ChannelBufferHolder<Object> directOutbound() {
return directOutbound;
return pipeline.directOutbound;
}
@Override
@ -628,7 +626,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
inFlushNow = true;
final ChannelBufferHolder<Object> out = directOutbound;
final ChannelBufferHolder<Object> out = directOutbound();
try {
Throwable cause = null;
int oldSize = out.size();

View File

@ -5,6 +5,8 @@ import io.netty.util.DefaultAttributeMap;
import java.net.SocketAddress;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelInboundHandlerContext<Object>, ChannelOutboundHandlerContext<Object> {
volatile DefaultChannelHandlerContext next;
@ -17,7 +19,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private final boolean canHandleInbound;
private final boolean canHandleOutbound;
final ChannelBufferHolder<Object> in;
private final ChannelBufferHolder<Object> out;
final ChannelBufferHolder<Object> out;
// When the two handlers run in a different thread and they are next to each other,
// each other's buffers can be accessed at the same time resuslting in a race condition.
// To avoid such situation, we lazily creates an additional thread-safe buffer called
// 'bridge' so that the two handlers access each other's buffer only via the bridges.
// The content written into a bridge is flushed into the actual buffer by flushBridge().
final AtomicReference<BlockingQueue<Object>> inMsgBridge;
final AtomicReference<BlockingQueue<Object>> outMsgBridge;
final AtomicReference<ChannelBuffer> inByteBridge;
final AtomicReference<ChannelBuffer> outByteBridge;
// Runnables that calls handlers
final Runnable fireChannelRegisteredTask = new Runnable() {
@ -73,6 +85,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@SuppressWarnings("unchecked")
public void run() {
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
flushBridge();
try {
((ChannelInboundHandler<Object>) ctx.handler()).inboundBufferUpdated(ctx);
} catch (Throwable t) {
@ -135,8 +148,23 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} catch (Exception e) {
throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e);
}
if (!in.isBypass()) {
if (in.hasByteBuffer()) {
inByteBridge = new AtomicReference<ChannelBuffer>();
inMsgBridge = null;
} else {
inByteBridge = null;
inMsgBridge = new AtomicReference<BlockingQueue<Object>>();
}
} else {
inByteBridge = null;
inMsgBridge = null;
}
} else {
in = null;
inByteBridge = null;
inMsgBridge = null;
}
if (canHandleOutbound) {
try {
@ -148,8 +176,38 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
// TODO Release the inbound buffer once pooling is implemented.
}
}
if (!out.isBypass()) {
if (out.hasByteBuffer()) {
outByteBridge = new AtomicReference<ChannelBuffer>();
outMsgBridge = null;
} else {
outByteBridge = null;
outMsgBridge = new AtomicReference<BlockingQueue<Object>>();
}
} else {
outByteBridge = null;
outMsgBridge = null;
}
} else {
out = null;
outByteBridge = null;
outMsgBridge = null;
}
}
void flushBridge() {
if (inMsgBridge != null) {
BlockingQueue<Object> bridge = inMsgBridge.get();
if (bridge != null) {
bridge.drainTo(in.messageBuffer());
}
}
if (outMsgBridge != null) {
BlockingQueue<Object> bridge = outMsgBridge.get();
if (bridge != null) {
bridge.drainTo(out.messageBuffer());
}
}
}
@ -229,7 +287,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public Queue<Object> nextInboundMessageBuffer() {
return DefaultChannelPipeline.nextInboundMessageBuffer(next);
return DefaultChannelPipeline.nextInboundMessageBuffer(executor(), next);
}
@Override
@ -239,7 +297,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public Queue<Object> nextOutboundMessageBuffer() {
return pipeline.nextOutboundMessageBuffer(prev);
return pipeline.nextOutboundMessageBuffer(executor(), prev);
}
@Override

View File

@ -18,6 +18,7 @@ package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.QueueFactory;
import java.net.SocketAddress;
import java.util.ArrayList;
@ -28,6 +29,9 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
/**
* The default {@link ChannelPipeline} implementation. It is usually created
@ -39,9 +43,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
final Channel channel;
private final Channel.Unsafe unsafe;
private final ChannelBufferHolder<Object> directOutbound;
final ChannelBufferHolder<Object> directOutbound;
private volatile DefaultChannelHandlerContext head;
private final DefaultChannelHandlerContext head;
private volatile DefaultChannelHandlerContext tail;
private final Map<String, DefaultChannelHandlerContext> name2ctx =
new HashMap<String, DefaultChannelHandlerContext>(4);
@ -56,8 +60,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new NullPointerException("channel");
}
this.channel = channel;
HeadHandler headHandler = new HeadHandler();
head = new DefaultChannelHandlerContext(
this, null, null, null, generateName(headHandler), headHandler);
tail = head;
directOutbound = head.out;
unsafe = channel.unsafe();
directOutbound = unsafe.directOutbound();
}
@Override
@ -72,23 +82,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public synchronized ChannelPipeline addFirst(EventExecutor executor, String name, ChannelHandler handler) {
if (name2ctx.isEmpty()) {
init(executor, name, handler);
} else {
checkDuplicateName(name);
DefaultChannelHandlerContext oldHead = head;
DefaultChannelHandlerContext newHead =
new DefaultChannelHandlerContext(this, executor, null, oldHead, name, handler);
checkDuplicateName(name);
DefaultChannelHandlerContext nextCtx = head.next;
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, executor, head, nextCtx, name, handler);
callBeforeAdd(newHead);
callBeforeAdd(newCtx);
oldHead.prev = newHead;
head = newHead;
name2ctx.put(name, newHead);
callAfterAdd(newHead);
if (nextCtx != null) {
nextCtx.prev = newCtx;
}
head.next = newCtx;
name2ctx.put(name, newCtx);
callAfterAdd(newCtx);
return this;
}
@ -99,23 +106,18 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public synchronized ChannelPipeline addLast(EventExecutor executor, String name, ChannelHandler handler) {
if (name2ctx.isEmpty()) {
init(executor, name, handler);
} else {
checkDuplicateName(name);
DefaultChannelHandlerContext oldTail = tail;
DefaultChannelHandlerContext newTail =
new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler);
checkDuplicateName(name);
DefaultChannelHandlerContext oldTail = tail;
DefaultChannelHandlerContext newTail =
new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler);
callBeforeAdd(newTail);
callBeforeAdd(newTail);
oldTail.next = newTail;
tail = newTail;
name2ctx.put(name, newTail);
callAfterAdd(newTail);
}
oldTail.next = newTail;
tail = newTail;
name2ctx.put(name, newTail);
callAfterAdd(newTail);
return this;
}
@ -127,22 +129,17 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public synchronized ChannelPipeline addBefore(EventExecutor executor, String baseName, String name, ChannelHandler handler) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
if (ctx == head) {
addFirst(name, handler);
} else {
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler);
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler);
callBeforeAdd(newCtx);
callBeforeAdd(newCtx);
ctx.prev.next = newCtx;
ctx.prev = newCtx;
name2ctx.put(name, newCtx);
callAfterAdd(newCtx);
}
ctx.prev.next = newCtx;
ctx.prev = newCtx;
name2ctx.put(name, newCtx);
callAfterAdd(newCtx);
return this;
}
@ -250,10 +247,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private DefaultChannelHandlerContext remove(DefaultChannelHandlerContext ctx) {
if (head == tail) {
head = tail = null;
name2ctx.clear();
return null;
} else if (ctx == head) {
removeFirst();
throw new Error(); // Should never happen.
} else if (ctx == tail) {
removeLast();
} else {
@ -272,55 +268,26 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public synchronized ChannelHandler removeFirst() {
if (name2ctx.isEmpty()) {
if (head == tail) {
throw new NoSuchElementException();
}
DefaultChannelHandlerContext oldHead = head;
if (oldHead == null) {
throw new NoSuchElementException();
}
callBeforeRemove(oldHead);
if (oldHead.next == null) {
head = tail = null;
name2ctx.clear();
} else {
oldHead.next.prev = null;
head = oldHead.next;
name2ctx.remove(oldHead.name());
}
callAfterRemove(oldHead);
return oldHead.handler();
return remove(head.next).handler();
}
@Override
public synchronized ChannelHandler removeLast() {
if (name2ctx.isEmpty()) {
if (head == tail) {
throw new NoSuchElementException();
}
DefaultChannelHandlerContext oldTail = tail;
if (oldTail == null) {
throw new NoSuchElementException();
}
callBeforeRemove(oldTail);
if (oldTail.prev == null) {
head = tail = null;
name2ctx.clear();
} else {
oldTail.prev.next = null;
tail = oldTail.prev;
name2ctx.remove(oldTail.name());
}
oldTail.prev.next = null;
tail = oldTail.prev;
name2ctx.remove(oldTail.name());
callBeforeRemove(oldTail);
return oldTail.handler();
}
@ -343,8 +310,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private ChannelHandler replace(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
if (ctx == head) {
removeFirst();
addFirst(newName, newHandler);
throw new IllegalArgumentException();
} else if (ctx == tail) {
removeLast();
addLast(newName, newHandler);
@ -472,20 +438,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public synchronized ChannelHandler first() {
DefaultChannelHandlerContext head = this.head;
if (head == null) {
DefaultChannelHandlerContext first = head.next;
if (first == null) {
return null;
}
return head.handler();
return first.handler();
}
@Override
public synchronized ChannelHandler last() {
DefaultChannelHandlerContext tail = this.tail;
if (tail == null) {
DefaultChannelHandlerContext last = tail;
if (last == head || last == null) {
return null;
}
return tail.handler();
return last.handler();
}
@Override
@ -548,7 +514,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
if (name2ctx.isEmpty()) {
return null;
}
DefaultChannelHandlerContext ctx = head;
DefaultChannelHandlerContext ctx = head.next;
for (;;) {
if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
return ctx;
@ -569,7 +535,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return list;
}
DefaultChannelHandlerContext ctx = head;
DefaultChannelHandlerContext ctx = head.next;
for (;;) {
list.add(ctx.name());
ctx = ctx.next;
@ -587,7 +553,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return map;
}
DefaultChannelHandlerContext ctx = head;
DefaultChannelHandlerContext ctx = head.next;
for (;;) {
map.put(ctx.name(), ctx.handler());
ctx = ctx.next;
@ -606,7 +572,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
StringBuilder buf = new StringBuilder();
buf.append(getClass().getSimpleName());
buf.append('{');
DefaultChannelHandlerContext ctx = head;
DefaultChannelHandlerContext ctx = head.next;
for (;;) {
buf.append('(');
buf.append(ctx.name());
@ -629,7 +595,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a message buffer.");
}
return nextInboundMessageBuffer(head);
return nextInboundMessageBuffer(null, head.next);
}
@Override
@ -638,12 +604,12 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a byte buffer.");
}
return nextInboundByteBuffer(head);
return nextInboundByteBuffer(head.next);
}
@Override
public Queue<Object> outboundMessageBuffer() {
return nextOutboundMessageBuffer(tail);
return nextOutboundMessageBuffer(null, tail);
}
@Override
@ -690,14 +656,27 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
static Queue<Object> nextInboundMessageBuffer(DefaultChannelHandlerContext ctx) {
static Queue<Object> nextInboundMessageBuffer(
EventExecutor currentExecutor, DefaultChannelHandlerContext ctx) {
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
}
ChannelBufferHolder<Object> in = ctx.inbound();
if (in != null && !in.isBypass() && in.hasMessageBuffer()) {
return in.messageBuffer();
final AtomicReference<BlockingQueue<Object>> inMsgBridge = ctx.inMsgBridge;
if (inMsgBridge != null) {
if (currentExecutor == ctx.executor()) {
return ctx.in.messageBuffer();
} else {
BlockingQueue<Object> queue = inMsgBridge.get();
if (queue == null) {
queue = QueueFactory.createQueue();
if (!inMsgBridge.compareAndSet(null, queue)) {
queue = inMsgBridge.get();
}
}
return queue;
}
}
ctx = ctx.next;
}
@ -706,11 +685,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
boolean hasNextOutboundByteBuffer(DefaultChannelHandlerContext ctx) {
for (;;) {
if (ctx == null) {
if (directOutbound.hasByteBuffer()) {
return true;
} else {
return false;
}
return false;
}
ChannelBufferHolder<Object> out = ctx.outbound();
@ -724,11 +699,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
boolean hasNextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) {
for (;;) {
if (ctx == null) {
if (directOutbound.hasMessageBuffer()) {
return true;
} else {
return false;
}
return false;
}
ChannelBufferHolder<Object> out = ctx.outbound();
@ -742,11 +713,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ChannelBuffer nextOutboundByteBuffer(DefaultChannelHandlerContext ctx) {
for (;;) {
if (ctx == null) {
if (directOutbound.hasByteBuffer()) {
return directOutbound.byteBuffer();
} else {
throw new NoSuchBufferException();
}
throw new NoSuchBufferException();
}
ChannelBufferHolder<Object> out = ctx.outbound();
@ -757,20 +724,28 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
Queue<Object> nextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) {
Queue<Object> nextOutboundMessageBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) {
for (;;) {
if (ctx == null) {
if (directOutbound.hasMessageBuffer()) {
return directOutbound.messageBuffer();
throw new NoSuchBufferException();
}
final AtomicReference<BlockingQueue<Object>> outMsgBridge = ctx.outMsgBridge;
if (outMsgBridge != null) {
if (currentExecutor == ctx.executor()) {
return ctx.out.messageBuffer();
} else {
throw new NoSuchBufferException();
BlockingQueue<Object> queue = outMsgBridge.get();
if (queue == null) {
queue = QueueFactory.createQueue();
if (!outMsgBridge.compareAndSet(null, queue)) {
queue = outMsgBridge.get();
}
}
return queue;
}
}
ChannelBufferHolder<Object> out = ctx.outbound();
if (out != null && !out.isBypass() && out.hasMessageBuffer()) {
return out.messageBuffer();
}
ctx = ctx.prev;
}
}
@ -998,24 +973,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
validateFuture(future);
if (ctx != null) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).bind(ctx, localAddress, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
bind(ctx, localAddress, future);
}
});
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).bind(ctx, localAddress, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
unsafe.bind(localAddress, future);
executor.execute(new Runnable() {
@Override
public void run() {
bind(ctx, localAddress, future);
}
});
}
return future;
}
@ -1036,24 +1007,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
validateFuture(future);
if (ctx != null) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).connect(ctx, remoteAddress, localAddress, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
connect(ctx, remoteAddress, localAddress, future);
}
});
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).connect(ctx, remoteAddress, localAddress, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
unsafe.connect(remoteAddress, localAddress, future);
executor.execute(new Runnable() {
@Override
public void run() {
connect(ctx, remoteAddress, localAddress, future);
}
});
}
return future;
@ -1066,24 +1033,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ChannelFuture disconnect(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
validateFuture(future);
if (ctx != null) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).disconnect(ctx, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
disconnect(ctx, future);
}
});
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).disconnect(ctx, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
unsafe.disconnect(future);
executor.execute(new Runnable() {
@Override
public void run() {
disconnect(ctx, future);
}
});
}
return future;
@ -1096,24 +1059,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ChannelFuture close(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
validateFuture(future);
if (ctx != null) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).close(ctx, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
close(ctx, future);
}
});
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).close(ctx, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
unsafe.close(future);
executor.execute(new Runnable() {
@Override
public void run() {
close(ctx, future);
}
});
}
return future;
@ -1126,24 +1085,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ChannelFuture deregister(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
validateFuture(future);
if (ctx != null) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).deregister(ctx, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
deregister(ctx, future);
}
});
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelOutboundHandler<Object>) ctx.handler()).deregister(ctx, future);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
unsafe.deregister(future);
executor.execute(new Runnable() {
@Override
public void run() {
deregister(ctx, future);
}
});
}
return future;
@ -1156,20 +1111,16 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ChannelFuture flush(final DefaultChannelHandlerContext ctx, final ChannelFuture future) {
validateFuture(future);
if (ctx != null) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
flush0(ctx, future);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
flush(ctx, future);
}
});
}
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
flush0(ctx, future);
} else {
unsafe.flush(future);
executor.execute(new Runnable() {
@Override
public void run() {
flush(ctx, future);
}
});
}
return future;
@ -1177,6 +1128,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private void flush0(final DefaultChannelHandlerContext ctx, ChannelFuture future) {
try {
ctx.flushBridge();
((ChannelOutboundHandler<Object>) ctx.handler()).flush(ctx, future);
} catch (Throwable t) {
notifyHandlerException(t);
@ -1204,16 +1156,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
boolean msgBuf = false;
for (;;) {
if (ctx == null) {
executor = channel.eventLoop();
out = directOutbound;
if (out.hasByteBuffer()) {
if(!(message instanceof ChannelBuffer)) {
throw new NoSuchBufferException();
}
} else {
msgBuf = true;
}
break;
throw new NoSuchBufferException();
}
if (ctx.canHandleOutbound()) {
@ -1238,11 +1181,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ChannelBuffer buf = (ChannelBuffer) message;
out.byteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
}
if (ctx != null) {
flush0(ctx, future);
} else {
unsafe.flush(future);
}
flush0(ctx, future);
return future;
} else {
final DefaultChannelHandlerContext ctx0 = ctx;
@ -1274,7 +1213,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
private DefaultChannelHandlerContext firstInboundContext() {
return nextInboundContext(head);
return nextInboundContext(head.next);
}
private DefaultChannelHandlerContext firstOutboundContext() {
@ -1347,16 +1286,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return inExceptionCaught(cause.getCause());
}
private void init(EventExecutor executor, String name, ChannelHandler handler) {
DefaultChannelHandlerContext ctx =
new DefaultChannelHandlerContext(this, executor, null, null, name, handler);
callBeforeAdd(ctx);
head = tail = ctx;
name2ctx.clear();
name2ctx.put(name, ctx);
callAfterAdd(ctx);
}
private void checkDuplicateName(String name) {
if (name2ctx.containsKey(name)) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
@ -1365,7 +1294,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private DefaultChannelHandlerContext getContextOrDie(String name) {
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(name);
if (ctx == null) {
if (ctx == null || ctx == head) {
throw new NoSuchElementException(name);
} else {
return ctx;
@ -1374,7 +1303,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handler);
if (ctx == null) {
if (ctx == null || ctx == head) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
@ -1383,10 +1312,84 @@ public class DefaultChannelPipeline implements ChannelPipeline {
private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) context(handlerType);
if (ctx == null) {
if (ctx == null || ctx == head) {
throw new NoSuchElementException(handlerType.getName());
} else {
return ctx;
}
}
@SuppressWarnings("rawtypes")
private final class HeadHandler implements ChannelOutboundHandler {
@Override
public ChannelBufferHolder newOutboundBuffer(
ChannelOutboundHandlerContext ctx) throws Exception {
switch (channel.type()) {
case STREAM:
return ChannelBufferHolders.byteBuffer();
case MESSAGE:
return ChannelBufferHolders.messageBuffer();
default:
throw new Error();
}
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void bind(ChannelOutboundHandlerContext ctx,
SocketAddress localAddress, ChannelFuture future)
throws Exception {
unsafe.bind(localAddress, future);
}
@Override
public void connect(ChannelOutboundHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelFuture future) throws Exception {
unsafe.connect(remoteAddress, localAddress, future);
}
@Override
public void disconnect(ChannelOutboundHandlerContext ctx,
ChannelFuture future) throws Exception {
unsafe.disconnect(future);
}
@Override
public void close(ChannelOutboundHandlerContext ctx,
ChannelFuture future) throws Exception {
unsafe.close(future);
}
@Override
public void deregister(ChannelOutboundHandlerContext ctx,
ChannelFuture future) throws Exception {
unsafe.deregister(future);
}
@Override
public void flush(ChannelOutboundHandlerContext ctx,
ChannelFuture future) throws Exception {
unsafe.flush(future);
}
}
}