Revert 25c7a783a7 and fix #1064 differently

- Rename inbound/outboundBufferFreed to inbound/OutboundShutdown which makes more sense
- Move DefaultChannelHandlerContext.isInbound/OutboundBufferFreed() to DefaultChannelPipeline
- Fix a problem where invokeFreeInbound/OutboundBuffer() sets inbound/outboundShutdown too early (this was the direct cause of #1064)
- Remove the volatile modifier - DCHC.prev/next are volatile and that's just enough
This commit is contained in:
Trustin Lee 2013-02-21 15:19:42 -08:00
parent a9a8d5d8c2
commit 0f46d4b379
2 changed files with 63 additions and 34 deletions

View File

@ -740,7 +740,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public ChannelHandlerContext fireChannelUnregistered() { public ChannelHandlerContext fireChannelUnregistered() {
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (prev != null && executor.inEventLoop()) {
next.invokeChannelUnregistered(); next.invokeChannelUnregistered();
} else { } else {
Runnable task = next.invokeChannelUnregisteredTask; Runnable task = next.invokeChannelUnregisteredTask;
@ -749,6 +749,9 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override @Override
public void run() { public void run() {
next.invokeChannelUnregistered(); next.invokeChannelUnregistered();
if (prev == null) {
}
} }
}; };
} }
@ -801,7 +804,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
public ChannelHandlerContext fireChannelInactive() { public ChannelHandlerContext fireChannelInactive() {
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (prev != null && executor.inEventLoop()) {
next.invokeChannelInactive(); next.invokeChannelInactive();
} else { } else {
Runnable task = next.invokeChannelInactiveTask; Runnable task = next.invokeChannelInactiveTask;
@ -836,7 +839,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
final DefaultChannelHandlerContext next = this.next; final DefaultChannelHandlerContext next = this.next;
EventExecutor executor = next.executor(); EventExecutor executor = next.executor();
if (executor.inEventLoop()) { if (prev != null && executor.inEventLoop()) {
next.invokeExceptionCaught(cause); next.invokeExceptionCaught(cause);
} else { } else {
try { try {
@ -923,7 +926,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void fireInboundBufferUpdated0() { private void fireInboundBufferUpdated0() {
final DefaultChannelHandlerContext next = findContextInbound(); final DefaultChannelHandlerContext next = findContextInbound();
if (!next.isInboundBufferFreed()) { if (!pipeline.isInboundShutdown()) {
next.fillInboundBridge(); next.fillInboundBridge();
// This comparison is safe because this method is always executed from the executor. // This comparison is safe because this method is always executed from the executor.
if (next.executor == executor) { if (next.executor == executor) {
@ -934,7 +937,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
next.invokeInboundBufferUpdatedTask = task = new Runnable() { next.invokeInboundBufferUpdatedTask = task = new Runnable() {
@Override @Override
public void run() { public void run() {
if (!next.isInboundBufferFreed()) { if (!pipeline.isInboundShutdown()) {
next.invokeInboundBufferUpdated(); next.invokeInboundBufferUpdated();
} }
} }
@ -959,7 +962,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} catch (Throwable t) { } catch (Throwable t) {
pipeline.notifyHandlerException(t); pipeline.notifyHandlerException(t);
} finally { } finally {
if (handler instanceof ChannelInboundByteHandler && !isInboundBufferFreed()) { if (handler instanceof ChannelInboundByteHandler && !pipeline.isInboundShutdown()) {
try { try {
((ChannelInboundByteHandler) handler).discardInboundReadBytes(this); ((ChannelInboundByteHandler) handler).discardInboundReadBytes(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -1282,7 +1285,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private void invokePrevFlush(ChannelPromise promise, Thread currentThread) { private void invokePrevFlush(ChannelPromise promise, Thread currentThread) {
DefaultChannelHandlerContext prev = findContextOutbound(); DefaultChannelHandlerContext prev = findContextOutbound();
if (prev.isOutboundBufferFreed()) { if (pipeline.isOutboundShutdown()) {
promise.setFailure(new ChannelPipelineException( promise.setFailure(new ChannelPipelineException(
"Unable to flush as outbound buffer of next handler was freed already")); "Unable to flush as outbound buffer of next handler was freed already"));
return; return;
@ -1324,7 +1327,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} catch (Throwable t) { } catch (Throwable t) {
pipeline.notifyHandlerException(t); pipeline.notifyHandlerException(t);
} finally { } finally {
if (handler instanceof ChannelOutboundByteHandler && !isOutboundBufferFreed()) { if (handler instanceof ChannelOutboundByteHandler && !pipeline.isOutboundShutdown()) {
try { try {
((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(this); ((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(this);
} catch (Throwable t) { } catch (Throwable t) {
@ -1446,7 +1449,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return; return;
} }
if (isOutboundBufferFreed()) { if (pipeline.isOutboundShutdown()) {
promise.setFailure(new ChannelPipelineException( promise.setFailure(new ChannelPipelineException(
"Unable to write as outbound buffer of next handler was freed already")); "Unable to write as outbound buffer of next handler was freed already"));
return; return;
@ -1461,9 +1464,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
void invokeFreeInboundBuffer() { void invokeFreeInboundBuffer() {
pipeline.inboundBufferFreed = true;
EventExecutor executor = executor(); EventExecutor executor = executor();
if (executor.inEventLoop()) { if (prev != null && executor.inEventLoop()) {
invokeFreeInboundBuffer0(); invokeFreeInboundBuffer0();
} else { } else {
Runnable task = invokeFreeInboundBuffer0Task; Runnable task = invokeFreeInboundBuffer0Task;
@ -1471,6 +1473,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
invokeFreeInboundBuffer0Task = task = new Runnable() { invokeFreeInboundBuffer0Task = task = new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.shutdownInbound();
invokeFreeInboundBuffer0(); invokeFreeInboundBuffer0();
} }
}; };
@ -1503,21 +1506,39 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
/** Invocation initiated by {@link #invokeFreeInboundBuffer0()} after freeing all inbound buffers. */ /** Invocation initiated by {@link #invokeFreeInboundBuffer0()} after freeing all inbound buffers. */
private void invokeFreeOutboundBuffer() { private void invokeFreeOutboundBuffer() {
pipeline.outboundBufferFreed = true;
EventExecutor executor = executor(); EventExecutor executor = executor();
if (executor.inEventLoop()) { if (next == null) {
invokeFreeOutboundBuffer0(); if (executor.inEventLoop()) {
} else { pipeline.shutdownOutbound();
Runnable task = invokeFreeOutboundBuffer0Task; invokeFreeOutboundBuffer0();
if (task == null) { } else {
invokeFreeOutboundBuffer0Task = task = new Runnable() { Runnable task = invokeFreeOutboundBuffer0Task;
@Override if (task == null) {
public void run() { invokeFreeOutboundBuffer0Task = task = new Runnable() {
invokeFreeOutboundBuffer0(); @Override
} public void run() {
}; pipeline.shutdownOutbound();
invokeFreeOutboundBuffer0();
}
};
}
executor.execute(task);
}
} else {
if (executor.inEventLoop()) {
invokeFreeOutboundBuffer0();
} else {
Runnable task = invokeFreeOutboundBuffer0Task;
if (task == null) {
invokeFreeOutboundBuffer0Task = task = new Runnable() {
@Override
public void run() {
invokeFreeOutboundBuffer0();
}
};
}
executor.execute(task);
} }
executor.execute(task);
} }
} }
@ -1569,14 +1590,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
} }
private boolean isInboundBufferFreed() {
return pipeline.inboundBufferFreed;
}
private boolean isOutboundBufferFreed() {
return pipeline.outboundBufferFreed;
}
private void validateFuture(ChannelFuture future) { private void validateFuture(ChannelFuture future) {
if (future == null) { if (future == null) {
throw new NullPointerException("future"); throw new NullPointerException("future");

View File

@ -58,8 +58,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final Map<EventExecutorGroup, EventExecutor> childExecutors = final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>(); new IdentityHashMap<EventExecutorGroup, EventExecutor>();
volatile boolean inboundBufferFreed; private boolean inboundShutdown;
volatile boolean outboundBufferFreed; private boolean outboundShutdown;
public DefaultChannelPipeline(Channel channel) { public DefaultChannelPipeline(Channel channel) {
if (channel == null) { if (channel == null) {
@ -845,6 +845,22 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return tail.nextOutboundByteBuffer(); return tail.nextOutboundByteBuffer();
} }
boolean isInboundShutdown() {
return inboundShutdown;
}
boolean isOutboundShutdown() {
return outboundShutdown;
}
void shutdownInbound() {
inboundShutdown = true;
}
void shutdownOutbound() {
outboundShutdown = true;
}
@Override @Override
public ChannelPipeline fireChannelRegistered() { public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered(); head.fireChannelRegistered();