Move the methods that's only used by DefaultChannelPipeline to DefaultChannelPipeline

This commit is contained in:
Trustin Lee 2013-03-14 15:01:35 +09:00
parent d55567e21b
commit 9c96791176
2 changed files with 93 additions and 87 deletions

View File

@ -24,20 +24,20 @@ import io.netty.buffer.Unpooled;
import io.netty.util.DefaultAttributeMap; import io.netty.util.DefaultAttributeMap;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.PlatformDependent;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static io.netty.channel.DefaultChannelPipeline.*; import static io.netty.channel.DefaultChannelPipeline.*;
final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
private static final int FLAG_REMOVED = 1;
private static final int FLAG_FREED = 2;
volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev; volatile DefaultChannelHandlerContext prev;
@ -56,6 +56,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private MessageBuf<Object> outMsgBuf; private MessageBuf<Object> outMsgBuf;
private ByteBuf outByteBuf; private ByteBuf outByteBuf;
private int flags;
// When the two handlers run in a different thread and they are next to each other, // When the two handlers run in a different thread and they are next to each other,
// each other's buffers can be accessed at the same time resulting in a race condition. // each other's buffers can be accessed at the same time resulting in a race condition.
// To avoid such situation, we lazily creates an additional thread-safe buffer called // To avoid such situation, we lazily creates an additional thread-safe buffer called
@ -93,8 +95,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private Runnable fireInboundBufferUpdated0Task; private Runnable fireInboundBufferUpdated0Task;
private Runnable invokeChannelReadSuspendedTask; private Runnable invokeChannelReadSuspendedTask;
private Runnable invokeRead0Task; private Runnable invokeRead0Task;
private boolean freed;
boolean removed;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
DefaultChannelHandlerContext( DefaultChannelHandlerContext(
@ -383,29 +383,38 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return true; return true;
} }
void freeHandlerBuffersAfterRemoval() { void setRemoved() {
if (!removed || freed) { flags |= FLAG_REMOVED;
return;
}
freed = true;
final ChannelHandler handler = handler();
if (handler instanceof ChannelInboundHandler) { // Free all buffers before completing removal.
try { if (!channel.isRegistered()) {
((ChannelInboundHandler) handler).freeInboundBuffer(this); freeHandlerBuffersAfterRemoval();
} catch (Exception e) {
notifyHandlerException(e);
} finally {
freeInboundBridge();
}
} }
if (handler instanceof ChannelOutboundHandler) { }
try {
((ChannelOutboundHandler) handler).freeOutboundBuffer(this); private void freeHandlerBuffersAfterRemoval() {
} catch (Exception e) { if (flags == FLAG_REMOVED) { // Removed, but not freed yet
notifyHandlerException(e); flags |= FLAG_FREED;
} finally {
freeOutboundBridge(); final ChannelHandler handler = handler();
if (handler instanceof ChannelInboundHandler) {
try {
((ChannelInboundHandler) handler).freeInboundBuffer(this);
} catch (Exception e) {
notifyHandlerException(e);
} finally {
freeInboundBridge();
}
}
if (handler instanceof ChannelOutboundHandler) {
try {
((ChannelOutboundHandler) handler).freeOutboundBuffer(this);
} catch (Exception e) {
notifyHandlerException(e);
} finally {
freeOutboundBridge();
}
} }
} }
} }
@ -530,54 +539,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return (MessageBuf<T>) outMsgBuf; return (MessageBuf<T>) outMsgBuf;
} }
/**
* Executes a task on the event loop and waits for it to finish. If the task is interrupted, then the
* current thread will be interrupted. It is expected that the task performs any appropriate locking.
* <p>
* If the {@link Runnable#run()} call throws a {@link Throwable}, but it is not an instance of
* {@link Error} or {@link RuntimeException}, then it is wrapped inside a
* {@link ChannelPipelineException} and that is thrown instead.</p>
*
* @param r execute this runnable
* @see Runnable#run()
* @see Future#get()
* @throws Error if the task threw this.
* @throws RuntimeException if the task threw this.
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
* {@link Throwable}.
*/
void executeOnEventLoop(Runnable r) {
waitForFuture(executor().submit(r));
}
/**
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
* It is expected that the task performs any appropriate locking.
* <p>
* If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error} or
* {@link RuntimeException}, then it is wrapped inside a {@link ChannelPipelineException} and that is
* thrown instead.</p>
*
* @param future wait for this future
* @see Future#get()
* @throws Error if the task threw this.
* @throws RuntimeException if the task threw this.
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
* {@link Throwable}.
*/
static void waitForFuture(Future<?> future) {
try {
future.get();
} catch (ExecutionException ex) {
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
PlatformDependent.throwException(ex.getCause());
} catch (InterruptedException ex) {
// Interrupt the calling thread (note that this method is not called from the event loop)
Thread.currentThread().interrupt();
}
}
@Override @Override
public ByteBuf nextInboundByteBuffer() { public ByteBuf nextInboundByteBuffer() {
DefaultChannelHandlerContext ctx = next; DefaultChannelHandlerContext ctx = next;
@ -939,7 +900,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} catch (Throwable t) { } catch (Throwable t) {
notifyHandlerException(t); notifyHandlerException(t);
} finally { } finally {
if (!freed) { if ((flags & FLAG_FREED) == 0) {
if (handler instanceof ChannelInboundByteHandler && !pipeline.isInboundShutdown()) { if (handler instanceof ChannelInboundByteHandler && !pipeline.isInboundShutdown()) {
try { try {
((ChannelInboundByteHandler) handler).discardInboundReadBytes(this); ((ChannelInboundByteHandler) handler).discardInboundReadBytes(this);

View File

@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel.Unsafe; import io.netty.channel.Channel.Unsafe;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -36,10 +37,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.WeakHashMap; import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import static io.netty.channel.DefaultChannelHandlerContext.*;
/** /**
* The default {@link ChannelPipeline} implementation. It is usually created * The default {@link ChannelPipeline} implementation. It is usually created
* by a {@link Channel} implementation when the {@link Channel} is created. * by a {@link Channel} implementation when the {@link Channel} is created.
@ -128,7 +128,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// Run the following 'waiting' code outside of the above synchronized block // Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock // in order to avoid deadlock
newCtx.executeOnEventLoop(new Runnable() { executeOnEventLoop(newCtx, new Runnable() {
@Override @Override
public void run() { public void run() {
synchronized (DefaultChannelPipeline.this) { synchronized (DefaultChannelPipeline.this) {
@ -178,7 +178,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// Run the following 'waiting' code outside of the above synchronized block // Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock // in order to avoid deadlock
newCtx.executeOnEventLoop(new Runnable() { executeOnEventLoop(newCtx, new Runnable() {
@Override @Override
public void run() { public void run() {
synchronized (DefaultChannelPipeline.this) { synchronized (DefaultChannelPipeline.this) {
@ -232,7 +232,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// Run the following 'waiting' code outside of the above synchronized block // Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock // in order to avoid deadlock
newCtx.executeOnEventLoop(new Runnable() { executeOnEventLoop(newCtx, new Runnable() {
@Override @Override
public void run() { public void run() {
synchronized (DefaultChannelPipeline.this) { synchronized (DefaultChannelPipeline.this) {
@ -284,7 +284,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
// Run the following 'waiting' code outside of the above synchronized block // Run the following 'waiting' code outside of the above synchronized block
// in order to avoid deadlock // in order to avoid deadlock
newCtx.executeOnEventLoop(new Runnable() { executeOnEventLoop(newCtx, new Runnable() {
@Override @Override
public void run() { public void run() {
synchronized (DefaultChannelPipeline.this) { synchronized (DefaultChannelPipeline.this) {
@ -676,8 +676,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
private void callAfterRemove(final DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext ctxPrev, private static void callAfterRemove(
DefaultChannelHandlerContext ctxNext, boolean forward) { final DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext ctxPrev,
DefaultChannelHandlerContext ctxNext, boolean forward) {
final ChannelHandler handler = ctx.handler(); final ChannelHandler handler = ctx.handler();
// Notify the complete removal. // Notify the complete removal.
@ -695,11 +697,54 @@ final class DefaultChannelPipeline implements ChannelPipeline {
ctx.clearBuffer(); ctx.clearBuffer();
} }
ctx.removed = true; ctx.setRemoved();
}
// Free all buffers before completing removal. /**
if (!channel.isRegistered()) { * Executes a task on the event loop and waits for it to finish. If the task is interrupted, then the
ctx.freeHandlerBuffersAfterRemoval(); * current thread will be interrupted. It is expected that the task performs any appropriate locking.
* <p>
* If the {@link Runnable#run()} call throws a {@link Throwable}, but it is not an instance of
* {@link Error} or {@link RuntimeException}, then it is wrapped inside a
* {@link ChannelPipelineException} and that is thrown instead.</p>
*
* @param r execute this runnable
* @see Runnable#run()
* @see Future#get()
* @throws Error if the task threw this.
* @throws RuntimeException if the task threw this.
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
* {@link Throwable}.
*/
private static void executeOnEventLoop(DefaultChannelHandlerContext ctx, Runnable r) {
waitForFuture(ctx.executor().submit(r));
}
/**
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
* It is expected that the task performs any appropriate locking.
* <p>
* If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error} or
* {@link RuntimeException}, then it is wrapped inside a {@link ChannelPipelineException} and that is
* thrown instead.</p>
*
* @param future wait for this future
* @see Future#get()
* @throws Error if the task threw this.
* @throws RuntimeException if the task threw this.
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
* {@link Throwable}.
*/
private static void waitForFuture(Future<?> future) {
try {
future.get();
} catch (ExecutionException ex) {
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
PlatformDependent.throwException(ex.getCause());
} catch (InterruptedException ex) {
// Interrupt the calling thread (note that this method is not called from the event loop)
Thread.currentThread().interrupt();
} }
} }