Undo AbstractChannelHandlerContext extend PausableChannelEventExecutor. Fixes #2814

Motivation:
Our last-minute change that made AbstractChannelHandlerContext implement PausableChannelEventExecutor broke the socksproxy example.

Modifications:
AbstractChannelHandlerContext does not inherit from PausableChannelEventExecutor anymore. Instead we'll allocate an extra object on demand.

Result:

AbstractChannelHandlerContext.executor().newPromise() returns the correct type.
This commit is contained in:
Jakob Buchgraber 2014-08-25 15:01:13 +02:00 committed by Norman Maurer
parent de6e73e3ea
commit 0381fa67a8
2 changed files with 103 additions and 36 deletions

View File

@ -28,16 +28,12 @@ import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* Abstract base class for {@link ChannelHandlerContext} implementations.
* <p>
* The unusual is-a relationship with {@link PausableChannelEventExecutor} was put in place to avoid
* additional object allocations that would have been required to wrap the return values of {@link #executor()}
* and {@link #invoker()}.
*/
abstract class AbstractChannelHandlerContext
extends PausableChannelEventExecutor implements ChannelHandlerContext, ResourceLeakHint {
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
// This class keeps an integer member field 'skipFlags' whose each bit tells if the corresponding handler method
// is annotated with @Skip. 'skipFlags' is retrieved in runtime via the reflection API and is cached.
@ -92,10 +88,21 @@ abstract class AbstractChannelHandlerContext
private static final WeakHashMap<Class<?>, Integer>[] skipFlagsCache =
new WeakHashMap[Runtime.getRuntime().availableProcessors()];
private static final AtomicReferenceFieldUpdater<AbstractChannelHandlerContext, PausableChannelEventExecutor>
WRAPPED_EVENTEXECUTOR_UPDATER;
static {
for (int i = 0; i < skipFlagsCache.length; i ++) {
skipFlagsCache[i] = new WeakHashMap<Class<?>, Integer>();
}
AtomicReferenceFieldUpdater<AbstractChannelHandlerContext, PausableChannelEventExecutor> updater =
PlatformDependent.newAtomicReferenceFieldUpdater(
AbstractChannelHandlerContext.class, "wrappedEventLoop");
if (updater == null) {
updater = AtomicReferenceFieldUpdater.newUpdater(
AbstractChannelHandlerContext.class, PausableChannelEventExecutor.class, "wrappedEventLoop");
}
WRAPPED_EVENTEXECUTOR_UPDATER = updater;
}
/**
@ -226,6 +233,11 @@ abstract class AbstractChannelHandlerContext
volatile Runnable invokeFlushTask;
volatile Runnable invokeChannelWritableStateChangedTask;
/**
* Wrapped {@link EventLoop} and {@link ChannelHandlerInvoker} to support {@link Channel#deregister()}.
*/
private volatile PausableChannelEventExecutor wrappedEventLoop;
AbstractChannelHandlerContext(
DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, int skipFlags) {
@ -286,12 +298,32 @@ abstract class AbstractChannelHandlerContext
@Override
public final EventExecutor executor() {
return this;
if (invoker == null) {
return channel().eventLoop();
} else {
return wrappedEventLoop();
}
}
@Override
public final ChannelHandlerInvoker invoker() {
return this;
if (invoker == null) {
return channel().eventLoop().asInvoker();
} else {
return wrappedEventLoop();
}
}
private PausableChannelEventExecutor wrappedEventLoop() {
PausableChannelEventExecutor wrapped = wrappedEventLoop;
if (wrapped == null) {
wrapped = new PausableChannelEventExecutor0();
if (!WRAPPED_EVENTEXECUTOR_UPDATER.compareAndSet(this, null, wrapped)) {
// Set in the meantime so we need to issue another volatile read
return wrappedEventLoop;
}
}
return wrapped;
}
@Override
@ -554,35 +586,44 @@ abstract class AbstractChannelHandlerContext
return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel + ')';
}
@Override
public EventExecutor unwrap() {
return unwrapInvoker().executor();
}
private final class PausableChannelEventExecutor0 extends PausableChannelEventExecutor {
@Override
public void rejectNewTasks() {
/**
* This cast is correct because {@link #channel()} always returns an {@link AbstractChannel} and
* {@link AbstractChannel#eventLoop()} always returns a {@link PausableEventExecutor}.
*/
((PausableEventExecutor) channel().eventLoop()).rejectNewTasks();
}
@Override
public void acceptNewTasks() {
((PausableEventExecutor) channel().eventLoop()).acceptNewTasks();
}
@Override
public boolean isAcceptingNewTasks() {
return ((PausableEventExecutor) channel().eventLoop()).isAcceptingNewTasks();
}
@Override
ChannelHandlerInvoker unwrapInvoker() {
if (invoker == null) {
return channel().unsafe().invoker();
@Override
public void rejectNewTasks() {
/**
* This cast is correct because {@link #channel()} always returns an {@link AbstractChannel} and
* {@link AbstractChannel#eventLoop()} always returns a {@link PausableChannelEventExecutor}.
*/
((PausableEventExecutor) channel().eventLoop()).rejectNewTasks();
}
@Override
public void acceptNewTasks() {
((PausableEventExecutor) channel().eventLoop()).acceptNewTasks();
}
@Override
public boolean isAcceptingNewTasks() {
return ((PausableEventExecutor) channel().eventLoop()).isAcceptingNewTasks();
}
@Override
public Channel channel() {
return AbstractChannelHandlerContext.this.channel();
}
@Override
public EventExecutor unwrap() {
return unwrapInvoker().executor();
}
@Override
public ChannelHandlerInvoker unwrapInvoker() {
/**
* {@link #invoker} can not be {@code null}, because {@link PausableChannelEventExecutor0} will only be
* instantiated if {@link #invoker} is not {@code null}.
*/
return invoker;
}
return invoker;
}
}

View File

@ -17,6 +17,7 @@ package io.netty.channel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
@ -25,7 +26,9 @@ import io.netty.util.NetUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.PausableEventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.Test;
import java.net.InetSocketAddress;
@ -199,6 +202,29 @@ public class ChannelDeregistrationTest {
.unwrap());
}
/**
* See https://github.com/netty/netty/issues/2814
*/
@Test(timeout = 1000)
public void testPromise() {
ChannelHandler handler = new TestChannelHandler1();
AbstractChannel ch = new EmbeddedChannel(handler);
DefaultChannelPipeline p = new DefaultChannelPipeline(ch);
DefaultChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(GlobalEventExecutor.INSTANCE);
ChannelHandlerContext ctx = new DefaultChannelHandlerContext(p, invoker, "Test", handler);
// Make sure no ClassCastException is thrown
Promise<Integer> promise = ctx.executor().newPromise();
promise.setSuccess(0);
assertTrue(promise.isSuccess());
ctx = new DefaultChannelHandlerContext(p, null, "Test", handler);
// Make sure no ClassCastException is thrown
promise = ctx.executor().newPromise();
promise.setSuccess(0);
assertTrue(promise.isSuccess());
}
private static final class TestChannelHandler1 extends ChannelHandlerAdapter { }
private static final class TestChannelHandler2 extends ChannelHandlerAdapter {