Move some logic from DefaultChannelPipeline to DefaultChannelHandlerContext

- Using the fact that head is always non-null, we can remove some code
  in DefaultChannelPipeline and move some to
  DefaultChannelHandlerContext
This commit is contained in:
Trustin Lee 2012-06-08 23:11:15 +09:00
parent 7c20426572
commit a507ea97ef
2 changed files with 160 additions and 227 deletions

View File

@ -129,7 +129,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
DefaultChannelHandlerContext.this.next, ChannelHandlerType.STATE);
if (next != null) {
next.fillBridge();
DefaultChannelPipeline.fireInboundBufferUpdated(next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.curCtxFireInboundBufferUpdatedTask.run();
} else {
executor.execute(next.curCtxFireInboundBufferUpdatedTask);
}
}
}
};
@ -381,12 +386,30 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public boolean hasNextInboundByteBuffer() {
return DefaultChannelPipeline.hasNextInboundByteBuffer(next);
DefaultChannelHandlerContext ctx = next;
for (;;) {
if (ctx == null) {
return false;
}
if (ctx.inByteBridge != null) {
return true;
}
ctx = ctx.next;
}
}
@Override
public boolean hasNextInboundMessageBuffer() {
return DefaultChannelPipeline.hasNextInboundMessageBuffer(next);
DefaultChannelHandlerContext ctx = next;
for (;;) {
if (ctx == null) {
return false;
}
if (ctx.inMsgBridge != null) {
return true;
}
ctx = ctx.next;
}
}
@Override
@ -401,12 +424,55 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelBuffer nextInboundByteBuffer() {
return DefaultChannelPipeline.nextInboundByteBuffer(next);
DefaultChannelHandlerContext ctx = next;
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
}
if (ctx.inByteBuf != null) {
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.inByteBuf;
} else {
StreamBridge bridge = ctx.inByteBridge.get();
if (bridge == null) {
bridge = new StreamBridge();
if (!ctx.inByteBridge.compareAndSet(null, bridge)) {
bridge = ctx.inByteBridge.get();
}
}
return bridge.byteBuf;
}
}
ctx = ctx.next;
}
}
@Override
public Queue<Object> nextInboundMessageBuffer() {
return DefaultChannelPipeline.nextInboundMessageBuffer(next);
DefaultChannelHandlerContext ctx = next;
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
}
if (ctx.inMsgBuf != null) {
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.inMsgBuf;
} else {
MessageBridge bridge = ctx.inMsgBridge.get();
if (bridge == null) {
bridge = new MessageBridge();
if (!ctx.inMsgBridge.compareAndSet(null, bridge)) {
bridge = ctx.inMsgBridge.get();
}
}
return bridge.msgBuf;
}
}
ctx = ctx.next;
}
}
@Override
@ -423,7 +489,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public void fireChannelRegistered() {
DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE);
if (next != null) {
DefaultChannelPipeline.fireChannelRegistered(next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.fireChannelRegisteredTask.run();
} else {
executor.execute(next.fireChannelRegisteredTask);
}
}
}
@ -431,7 +502,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public void fireChannelUnregistered() {
DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE);
if (next != null) {
DefaultChannelPipeline.fireChannelUnregistered(next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.fireChannelUnregisteredTask.run();
} else {
executor.execute(next.fireChannelUnregisteredTask);
}
}
}
@ -439,7 +515,12 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public void fireChannelActive() {
DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE);
if (next != null) {
DefaultChannelPipeline.fireChannelActive(next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.fireChannelActiveTask.run();
} else {
executor.execute(next.fireChannelActiveTask);
}
}
}
@ -447,25 +528,73 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public void fireChannelInactive() {
DefaultChannelHandlerContext next = nextContext(this.next, ChannelHandlerType.STATE);
if (next != null) {
DefaultChannelPipeline.fireChannelInactive(next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.fireChannelInactiveTask.run();
} else {
executor.execute(next.fireChannelInactiveTask);
}
}
}
@Override
public void fireExceptionCaught(Throwable cause) {
public void fireExceptionCaught(final Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
DefaultChannelHandlerContext next = this.next;
if (next != null) {
pipeline.fireExceptionCaught(next, cause);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
try {
next.handler().exceptionCaught(next, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", cause);
}
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
fireExceptionCaught(cause);
}
});
}
} else {
DefaultChannelPipeline.logTerminalException(cause);
logger.warn(
"An exceptionCaught() event was fired, and it reached at the end of the " +
"pipeline. It usually means the last inbound handler in the pipeline did not " +
"handle the exception.", cause);
}
}
@Override
public void fireUserEventTriggered(Object event) {
public void fireUserEventTriggered(final Object event) {
if (event == null) {
throw new NullPointerException("event");
}
DefaultChannelHandlerContext next = this.next;
if (next != null) {
pipeline.fireUserEventTriggered(next, event);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
try {
next.handler().userEventTriggered(next, event);
} catch (Throwable t) {
pipeline.notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
fireUserEventTriggered(event);
}
});
}
}
}

View File

@ -887,7 +887,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a message buffer.");
}
return nextInboundMessageBuffer(head.next);
return head.nextInboundMessageBuffer();
}
@Override
@ -896,7 +896,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a byte buffer.");
}
return nextInboundByteBuffer(head.next);
return head.nextInboundByteBuffer();
}
@Override
@ -909,79 +909,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return nextOutboundByteBuffer(tail);
}
static boolean hasNextInboundByteBuffer(DefaultChannelHandlerContext ctx) {
for (;;) {
if (ctx == null) {
return false;
}
if (ctx.inByteBridge != null) {
return true;
}
ctx = ctx.next;
}
}
static boolean hasNextInboundMessageBuffer(DefaultChannelHandlerContext ctx) {
for (;;) {
if (ctx == null) {
return false;
}
if (ctx.inMsgBridge != null) {
return true;
}
ctx = ctx.next;
}
}
static ChannelBuffer nextInboundByteBuffer(DefaultChannelHandlerContext ctx) {
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
}
if (ctx.inByteBuf != null) {
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.inByteBuf;
} else {
StreamBridge bridge = ctx.inByteBridge.get();
if (bridge == null) {
bridge = new StreamBridge();
if (!ctx.inByteBridge.compareAndSet(null, bridge)) {
bridge = ctx.inByteBridge.get();
}
}
return bridge.byteBuf;
}
}
ctx = ctx.next;
}
}
static Queue<Object> nextInboundMessageBuffer(DefaultChannelHandlerContext ctx) {
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
}
if (ctx.inMsgBuf != null) {
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.inMsgBuf;
} else {
MessageBridge bridge = ctx.inMsgBridge.get();
if (bridge == null) {
bridge = new MessageBridge();
if (!ctx.inMsgBridge.compareAndSet(null, bridge)) {
bridge = ctx.inMsgBridge.get();
}
}
return bridge.msgBuf;
}
}
ctx = ctx.next;
}
}
boolean hasNextOutboundByteBuffer(DefaultChannelHandlerContext ctx) {
for (;;) {
if (ctx == null) {
@ -1060,152 +987,41 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void fireChannelRegistered() {
DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE);
if (ctx != null) {
fireChannelRegistered(ctx);
}
}
static void fireChannelRegistered(DefaultChannelHandlerContext ctx) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
ctx.fireChannelRegisteredTask.run();
} else {
executor.execute(ctx.fireChannelRegisteredTask);
}
head.fireChannelRegistered();
}
@Override
public void fireChannelUnregistered() {
DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE);
if (ctx != null) {
fireChannelUnregistered(ctx);
}
}
static void fireChannelUnregistered(DefaultChannelHandlerContext ctx) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
ctx.fireChannelUnregisteredTask.run();
} else {
executor.execute(ctx.fireChannelUnregisteredTask);
}
head.fireChannelUnregistered();
}
@Override
public void fireChannelActive() {
DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE);
if (ctx != null) {
firedChannelActive = true;
fireChannelActive(ctx);
if (fireInboundBufferUpdatedOnActivation) {
fireInboundBufferUpdatedOnActivation = false;
fireInboundBufferUpdated(ctx);
}
}
}
static void fireChannelActive(DefaultChannelHandlerContext ctx) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
ctx.fireChannelActiveTask.run();
} else {
executor.execute(ctx.fireChannelActiveTask);
firedChannelActive = true;
head.fireChannelActive();
if (fireInboundBufferUpdatedOnActivation) {
fireInboundBufferUpdatedOnActivation = false;
head.fireInboundBufferUpdated();
}
}
@Override
public void fireChannelInactive() {
DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE);
if (ctx != null) {
// Some implementations such as EmbeddedChannel can trigger inboundBufferUpdated()
// after deactivation, so it's safe not to revert the firedChannelActive flag here.
// Also, all known transports never get re-activated.
//firedChannelActive = false;
fireChannelInactive(ctx);
}
}
static void fireChannelInactive(DefaultChannelHandlerContext ctx) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
ctx.fireChannelInactiveTask.run();
} else {
executor.execute(ctx.fireChannelInactiveTask);
}
// Some implementations such as EmbeddedChannel can trigger inboundBufferUpdated()
// after deactivation, so it's safe not to revert the firedChannelActive flag here.
// Also, all known transports never get re-activated.
//firedChannelActive = false;
head.fireChannelInactive();
}
@Override
public void fireExceptionCaught(Throwable cause) {
DefaultChannelHandlerContext ctx = head.next;
if (ctx != null) {
fireExceptionCaught(ctx, cause);
} else {
logTerminalException(cause);
}
}
static void logTerminalException(Throwable cause) {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the end of the " +
"pipeline. It usually means the last inbound handler in the pipeline did not " +
"handle the exception.", cause);
}
void fireExceptionCaught(final DefaultChannelHandlerContext ctx, final Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelInboundHandler<Object>) ctx.handler()).exceptionCaught(ctx, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler's " +
"exceptionCaught() method while handling the following exception:", cause);
}
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
fireExceptionCaught(ctx, cause);
}
});
}
head.fireExceptionCaught(cause);
}
@Override
public void fireUserEventTriggered(Object event) {
DefaultChannelHandlerContext ctx = head.next;
if (ctx != null) {
fireUserEventTriggered(ctx, event);
}
}
void fireUserEventTriggered(final DefaultChannelHandlerContext ctx, final Object event) {
if (event == null) {
throw new NullPointerException("event");
}
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
try {
((ChannelInboundHandler<Object>) ctx.handler()).userEventTriggered(ctx, event);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
executor.execute(new Runnable() {
@Override
public void run() {
fireUserEventTriggered(ctx, event);
}
});
}
head.fireUserEventTriggered(event);
}
@Override
@ -1214,19 +1030,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
fireInboundBufferUpdatedOnActivation = true;
return;
}
DefaultChannelHandlerContext ctx = firstContext(ChannelHandlerType.STATE);
if (ctx != null) {
fireInboundBufferUpdated(ctx);
}
}
static void fireInboundBufferUpdated(DefaultChannelHandlerContext ctx) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
ctx.curCtxFireInboundBufferUpdatedTask.run();
} else {
executor.execute(ctx.curCtxFireInboundBufferUpdatedTask);
}
head.fireInboundBufferUpdated();
}
@Override