Clean up the pipeline implementation / Ensure Embedded*Channel does not run pending tasks immediately

- Replace ugly 'prev != null' check with explicit event scheduling
- Fix an incorrect flag operation in freeHandlerBuffersAfterRemoval()
- Fix a bug in AbstractEmbeddedChannel.doRegister where it makes pending tasks immediately, where the pending tasks actually triggers inbound events
- Remove unnecessary suppression of inboundBufferUpdated() event in DefaultChannelPipeline, which potentially hides an event ordering bug. Unfortunately, I don't remember why I added it in cca35454d2.
This commit is contained in:
Trustin Lee 2013-04-22 19:40:23 +09:00
parent 9c4bfa44d9
commit e80fb65c36
4 changed files with 90 additions and 46 deletions

View File

@ -675,8 +675,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
doDisconnect(); doDisconnect();
promise.setSuccess(); promise.setSuccess();
if (wasActive && !isActive()) { if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive(); pipeline.fireChannelInactive();
} }
});
}
} catch (Throwable t) { } catch (Throwable t) {
promise.setFailure(t); promise.setFailure(t);
closeIfClosed(); closeIfClosed();
@ -710,8 +715,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
flushFutureNotifier.notifyFlushFutures(closedChannelException); flushFutureNotifier.notifyFlushFutures(closedChannelException);
if (wasActive && !isActive()) { if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive(); pipeline.fireChannelInactive();
} }
});
}
deregister(voidFuture()); deregister(voidFuture());
} else { } else {
@ -754,7 +764,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (registered) { if (registered) {
registered = false; registered = false;
promise.setSuccess(); promise.setSuccess();
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelUnregistered(); pipeline.fireChannelUnregistered();
}
});
} else { } else {
// Some transports like local and AIO does not allow the deregistration of // Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently, // an open channel. Their doDeregister() calls close(). Consequently,
@ -785,8 +800,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (eventLoop().inEventLoop()) { if (eventLoop().inEventLoop()) {
try { try {
doBeginRead(); doBeginRead();
} catch (Exception e) { } catch (final Exception e) {
pipeline().fireExceptionCaught(e); invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(unsafe().voidFuture()); close(unsafe().voidFuture());
} }
} else { } else {
@ -849,10 +869,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private void flush0() { private void flush0() {
if (!inFlushNow) { // Avoid re-entrance if (!inFlushNow) { // Avoid re-entrance
try { try {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call flushNow() later,
// and thus there's no need to call it now.
if (!isFlushPending()) { if (!isFlushPending()) {
flushNow(); flushNow();
} else {
// Event loop will call flushNow() later by itself.
} }
} catch (Throwable t) { } catch (Throwable t) {
flushFutureNotifier.notifyFlushFutures(t); flushFutureNotifier.notifyFlushFutures(t);
@ -930,6 +951,21 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
close(voidFuture()); close(voidFuture());
} }
private void invokeLater(Runnable task) {
// This method is used by outbound operation implementations to trigger an inbound event later.
// They do not trigger an inbound event immediately because an outbound operation might have been
// triggered by another inbound event handler method. If fired immediately, the call stack
// will look like this for example:
//
// handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
// -> handlerA.ctx.close()
// -> channel.unsafe.close()
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
//
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
eventLoop().execute(task);
}
} }
/** /**

View File

@ -24,11 +24,14 @@ import io.netty.buffer.Unpooled;
import io.netty.util.DefaultAttributeMap; import io.netty.util.DefaultAttributeMap;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.PlatformDependent;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static io.netty.channel.DefaultChannelPipeline.*; import static io.netty.channel.DefaultChannelPipeline.*;
@ -309,17 +312,42 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
void initHeadHandler() { void initHeadHandler() {
// Must be called for the head handler. // Must be called for the head handler.
EventExecutor executor = executor();
if (executor.inEventLoop()) {
HeadHandler h = (HeadHandler) handler; HeadHandler h = (HeadHandler) handler;
if (h.initialized) { if (h.initialized) {
return; return;
} }
assert executor().inEventLoop();
h.init(this); h.init(this);
h.initialized = true; h.initialized = true;
outByteBuf = h.byteSink; outByteBuf = h.byteSink;
outMsgBuf = h.msgSink; outMsgBuf = h.msgSink;
} else {
Future<?> f = executor.submit(new Runnable() {
@Override
public void run() {
initHeadHandler();
}
});
boolean interrupted = false;
try {
while (!f.isDone()) {
try {
f.get();
} catch (InterruptedException e) {
interrupted = true;
} catch (ExecutionException e) {
PlatformDependent.throwException(e);
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
} }
private boolean flushInboundBridge() { private boolean flushInboundBridge() {
@ -401,8 +429,9 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
private void freeHandlerBuffersAfterRemoval() { private void freeHandlerBuffersAfterRemoval() {
if (flags == FLAG_REMOVED) { // Removed, but not freed yet int flags = this.flags;
flags |= FLAG_FREED; if ((flags & FLAG_REMOVED) != 0 && (flags & FLAG_FREED) == 0) { // Removed, but not freed yet
this.flags |= FLAG_FREED;
final ChannelHandler handler = handler(); final ChannelHandler handler = handler();
@ -691,7 +720,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public ChannelHandlerContext fireChannelUnregistered() { public ChannelHandlerContext fireChannelUnregistered() {
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (prev != null && executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelUnregistered(); next.invokeChannelUnregistered();
} else { } else {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@ -743,7 +772,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public ChannelHandlerContext fireChannelInactive() { public ChannelHandlerContext fireChannelInactive() {
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (prev != null && executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeChannelInactive(); next.invokeChannelInactive();
} else { } else {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@ -778,7 +807,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void invokeExceptionCaught(final Throwable cause) { private void invokeExceptionCaught(final Throwable cause) {
EventExecutor executor = executor(); EventExecutor executor = executor();
if (prev != null && executor.inEventLoop()) { if (executor.inEventLoop()) {
invokeExceptionCaught0(cause); invokeExceptionCaught0(cause);
} else { } else {
try { try {
@ -897,7 +926,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void invokeInboundBufferUpdated() { private void invokeInboundBufferUpdated() {
ChannelStateHandler handler = (ChannelStateHandler) handler(); ChannelStateHandler handler = (ChannelStateHandler) handler();
if (handler instanceof ChannelInboundHandler) { if (handler instanceof ChannelInboundHandler) {
for (;;) { for (;;) {
try { try {
@ -1424,7 +1452,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
void invokeFreeInboundBuffer() { void invokeFreeInboundBuffer() {
EventExecutor executor = executor(); EventExecutor executor = executor();
if (prev != null && executor.inEventLoop()) { if (executor.inEventLoop()) {
invokeFreeInboundBuffer0(); invokeFreeInboundBuffer0();
} else { } else {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@ -1517,11 +1545,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return; return;
} }
if (handler() instanceof ChannelStateHandler) {
invokeExceptionCaught(cause); invokeExceptionCaught(cause);
} else {
findContextInbound().invokeExceptionCaught(cause);
}
} }
private static boolean inExceptionCaught(Throwable cause) { private static boolean inExceptionCaught(Throwable cause) {

View File

@ -65,8 +65,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private final Map<String, DefaultChannelHandlerContext> name2ctx = private final Map<String, DefaultChannelHandlerContext> name2ctx =
new HashMap<String, DefaultChannelHandlerContext>(4); new HashMap<String, DefaultChannelHandlerContext>(4);
private boolean firedChannelActive;
private boolean fireInboundBufferUpdatedOnActivation;
final Map<EventExecutorGroup, EventExecutor> childExecutors = final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>(); new IdentityHashMap<EventExecutorGroup, EventExecutor>();
@ -814,7 +812,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline fireChannelActive() { public ChannelPipeline fireChannelActive() {
firedChannelActive = true;
head.initHeadHandler(); head.initHeadHandler();
head.fireChannelActive(); head.fireChannelActive();
@ -822,10 +819,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
channel.read(); channel.read();
} }
if (fireInboundBufferUpdatedOnActivation) {
fireInboundBufferUpdatedOnActivation = false;
head.fireInboundBufferUpdated();
}
return this; return this;
} }
@ -853,10 +846,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public ChannelPipeline fireInboundBufferUpdated() { public ChannelPipeline fireInboundBufferUpdated() {
if (!firedChannelActive) {
fireInboundBufferUpdatedOnActivation = true;
return this;
}
head.fireInboundBufferUpdated(); head.fireInboundBufferUpdated();
return this; return this;
} }

View File

@ -226,12 +226,7 @@ public abstract class AbstractEmbeddedChannel<O> extends AbstractChannel {
@Override @Override
protected Runnable doDeregister() throws Exception { protected Runnable doDeregister() throws Exception {
return new Runnable() { return null;
@Override
public void run() {
runPendingTasks();
}
};
} }
@Override @Override