Synchronized between 4.1 and master

Motivation:

4 and 5 were diverged long time ago and we recently reverted some of the
early commits in master.  We must make sure 4.1 and master are not very
different now.

Modification:

Fix found differences

Result:

4.1 and master got closer.
This commit is contained in:
Norman Maurer 2014-04-24 20:47:26 +02:00
parent d2614cfc01
commit 7fa861ab70
4 changed files with 76 additions and 59 deletions

View File

@ -36,6 +36,7 @@ import java.util.NoSuchElementException;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* The default {@link ChannelPipeline} implementation. It is usually created
@ -49,10 +50,21 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private static final WeakHashMap<Class<?>, String>[] nameCaches =
new WeakHashMap[Runtime.getRuntime().availableProcessors()];
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, Map> childInvokersUpdater;
static {
for (int i = 0; i < nameCaches.length; i ++) {
nameCaches[i] = new WeakHashMap<Class<?>, String>();
}
@SuppressWarnings("rawtypes")
AtomicReferenceFieldUpdater<DefaultChannelPipeline, Map> updater;
updater = PlatformDependent.newAtomicReferenceFieldUpdater(DefaultChannelPipeline.class, "childInvokers");
if (updater == null) {
updater = AtomicReferenceFieldUpdater.newUpdater(DefaultChannelPipeline.class, Map.class, "childInvokers");
}
childInvokersUpdater = updater;
}
final AbstractChannel channel;
@ -63,10 +75,15 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private final Map<String, DefaultChannelHandlerContext> name2ctx =
new HashMap<String, DefaultChannelHandlerContext>(4);
final Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers =
new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>();
/**
* Updated by {@link #childInvokersUpdater}.
*
* @see #findInvoker(EventExecutorGroup)
*/
@SuppressWarnings("UnusedDeclaration")
private volatile Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers;
public DefaultChannelPipeline(AbstractChannel channel) {
DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
@ -101,13 +118,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, invoker, name, handler);
addFirst0(name, newCtx);
addFirst0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
}
return this;
}
@ -139,13 +151,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, invoker, name, handler);
addLast0(name, newCtx);
addLast0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
}
return this;
}
@ -178,13 +185,8 @@ final class DefaultChannelPipeline implements ChannelPipeline {
ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, invoker, name, handler);
addBefore0(name, ctx, newCtx);
addBefore0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
}
return this;
}
@ -217,15 +219,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler) {
synchronized (this) {
DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx =
new DefaultChannelHandlerContext(this, invoker, name, handler);
addAfter0(name, ctx, newCtx);
addAfter0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
}
return this;
}
@ -308,9 +304,20 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return null;
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
ChannelHandlerInvoker invoker = childInvokers.get(group);
// Lazily initialize the data structure that maps an EventExecutorGroup to a ChannelHandlerInvoker.
Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers = this.childInvokers;
if (childInvokers == null) {
childInvokers = new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>();
if (!childInvokersUpdater.compareAndSet(this, null, childInvokers)) {
childInvokers = this.childInvokers;
}
}
// Pick one of the child executors and remember its invoker
// so that the same invoker is used to fire events for the same channel.
ChannelHandlerInvoker invoker;
synchronized (childInvokers) {
invoker = childInvokers.get(group);
if (invoker == null) {
EventExecutor executor = group.next();
if (executor instanceof EventLoop) {
@ -320,6 +327,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
childInvokers.put(group, invoker);
}
}
return invoker;
}
@ -550,7 +558,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
try {
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
t.printStackTrace();
boolean removed = false;
try {
remove(ctx);
@ -936,7 +943,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return tail.close(promise);
return tail.deregister(promise);
}
@Override
@ -1043,9 +1050,9 @@ final class DefaultChannelPipeline implements ChannelPipeline {
static final class HeadHandler extends ChannelHandlerAdapter {
protected final Unsafe unsafe;
private final Unsafe unsafe;
protected HeadHandler(Unsafe unsafe) {
HeadHandler(Unsafe unsafe) {
this.unsafe = unsafe;
}

View File

@ -109,14 +109,27 @@ public final class NioDatagramChannel
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* Create a new instance using the given {@link SelectorProvider}
* which will use the Operation Systems default {@link InternetProtocolFamily}.
*/
public NioDatagramChannel(SelectorProvider provider) {
this(newSocket(provider));
}
/**
* Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
* on the Operation Systems default which will be chosen.
*/
public NioDatagramChannel(InternetProtocolFamily ipFamily) {
this(DEFAULT_SELECTOR_PROVIDER, ipFamily);
this(newSocket(DEFAULT_SELECTOR_PROVIDER, ipFamily));
}
/**
* Create a new instance using the given {@link SelectorProvider} and {@link InternetProtocolFamily}.
* If {@link InternetProtocolFamily} is {@code null} it will depend on the Operation Systems default
* which will be chosen.
*/
public NioDatagramChannel(SelectorProvider provider, InternetProtocolFamily ipFamily) {
this(newSocket(provider, ipFamily));
}

View File

@ -67,11 +67,21 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
* Create a new instance
*/
public NioServerSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioServerSocketChannel(SelectorProvider provider) {
super(null, newSocket(provider), SelectionKey.OP_ACCEPT);
this(newSocket(provider));
}
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

View File

@ -23,8 +23,6 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import java.io.UnsupportedEncodingException;
import static org.junit.Assert.*;
class BaseChannelTest {
@ -66,18 +64,6 @@ class BaseChannelTest {
return buf;
}
static Object createTestBuf(String string) throws UnsupportedEncodingException {
byte[] buf = string.getBytes("US-ASCII");
return createTestBuf(buf);
}
static Object createTestBuf(byte[] buf) {
ByteBuf ret = createTestBuf(buf.length);
ret.clear();
ret.writeBytes(buf);
return ret;
}
void assertLog(String expected) {
String actual = loggingHandler.getLog();
assertEquals(expected, actual);
@ -90,4 +76,5 @@ class BaseChannelTest {
void setInterest(LoggingHandler.Event... events) {
loggingHandler.setInterest(events);
}
}