diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioChannelFinder.java b/transport/src/main/java/io/netty/channel/socket/aio/AioChannelFinder.java new file mode 100644 index 0000000000..cbd1ee4145 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioChannelFinder.java @@ -0,0 +1,5 @@ +package io.netty.channel.socket.aio; + +interface AioChannelFinder { + AbstractAioChannel findChannel(Runnable command) throws Exception; +} \ No newline at end of file diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java index 2c77faf446..a1892dc044 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java @@ -15,47 +15,39 @@ */ package io.netty.channel.socket.aio; +import io.netty.channel.ChannelTaskScheduler; import io.netty.channel.EventExecutor; import io.netty.channel.EventLoopException; import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.channel.ChannelTaskScheduler; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DetectionUtil; import java.io.IOException; -import java.lang.reflect.Field; import java.nio.channels.AsynchronousChannelGroup; -import java.util.ArrayDeque; import java.util.Collections; -import java.util.Deque; import java.util.List; import java.util.concurrent.AbstractExecutorService; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import sun.misc.Unsafe; - public class AioEventLoopGroup extends MultithreadEventLoopGroup { private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(AioEventLoopGroup.class); - private static final ChannelFinder CHANNEL_FINDER; + private static final AioChannelFinder CHANNEL_FINDER; static { - ChannelFinder finder; + AioChannelFinder finder; try { - // check if Unsafe is present on the classpath - // and if so try to instance the UnsafeChannelFinder - // too get the optimal speed. if (DetectionUtil.hasUnsafe()) { - finder = new UnsafeChannelFinder(); + finder = new UnsafeAioChannelFinder(); } else { - finder = new ReflectionChannelFinder(); + finder = new DefaultAioChannelFinder(); } } catch (Throwable t) { - LOGGER.debug("Unable to instance UnsafeChannelFinder fallback to ReflectionChannelFinder", t); - finder = new ReflectionChannelFinder(); + LOGGER.debug(String.format( + "Unable to instance the optimal %s implementation - falling back to %s.", + AioChannelFinder.class.getSimpleName(), DefaultAioChannelFinder.class.getSimpleName()), t); + finder = new DefaultAioChannelFinder(); } CHANNEL_FINDER = finder; } @@ -145,102 +137,4 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup { } } } - - interface ChannelFinder { - AbstractAioChannel findChannel(Runnable command) throws Exception; - } - - static class ReflectionChannelFinder implements ChannelFinder { - private static final ConcurrentMap, Field[]> fieldCache = new ConcurrentHashMap, Field[]>(); - private static final Field[] FAILURE = new Field[0]; - - @Override - public AbstractAioChannel findChannel(Runnable command) throws Exception { - Class commandType = command.getClass(); - Field[] fields = fieldCache.get(commandType); - if (fields == null) { - try { - fields = findFieldSequence(command, new ArrayDeque(2)); - } catch (Throwable t) { - // Failed to get the field list - } - - if (fields == null) { - fields = FAILURE; - } - - fieldCache.put(commandType, fields); // No need to use putIfAbsent() - } - - if (fields == FAILURE) { - return null; - } - - final int lastIndex = fields.length - 1; - for (int i = 0; i < lastIndex; i ++) { - command = (Runnable) get(fields[i], command); - } - - return (AbstractAioChannel) get(fields[lastIndex], command); - } - - private Field[] findFieldSequence(Runnable command, Deque fields) throws Exception { - Class commandType = command.getClass(); - for (Field f: commandType.getDeclaredFields()) { - if (f.getType() == Runnable.class) { - f.setAccessible(true); - fields.addLast(f); - try { - Field[] ret = findFieldSequence((Runnable) get(f, command), fields); - if (ret != null) { - return ret; - } - } finally { - fields.removeLast(); - } - } - - if (f.getType() == Object.class) { - f.setAccessible(true); - fields.addLast(f); - try { - Object candidate = get(f, command); - if (candidate instanceof AbstractAioChannel) { - return fields.toArray(new Field[fields.size()]); - } - } finally { - fields.removeLast(); - } - } - } - - return null; - } - - protected Object get(Field f, Object command) throws Exception { - return f.get(command); - } - } - - static class UnsafeChannelFinder extends ReflectionChannelFinder { - private static final Unsafe UNSAFE = getUnsafe(); - - @Override - protected Object get(Field f, Object command) throws Exception { - // using Unsafe to directly access the field. This should be - // faster then "pure" reflection - long offset = UNSAFE.objectFieldOffset(f); - return UNSAFE.getObject(command, offset); - } - - private static Unsafe getUnsafe() { - try { - Field singleoneInstanceField = Unsafe.class.getDeclaredField("theUnsafe"); - singleoneInstanceField.setAccessible(true); - return (Unsafe) singleoneInstanceField.get(null); - } catch (Throwable cause) { - throw new RuntimeException("Error while obtaining sun.misc.Unsafe", cause); - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/DefaultAioChannelFinder.java b/transport/src/main/java/io/netty/channel/socket/aio/DefaultAioChannelFinder.java new file mode 100644 index 0000000000..1d852e20d0 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/DefaultAioChannelFinder.java @@ -0,0 +1,79 @@ +package io.netty.channel.socket.aio; + +import java.lang.reflect.Field; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +class DefaultAioChannelFinder implements AioChannelFinder { + private static final ConcurrentMap, Field[]> fieldCache = new ConcurrentHashMap, Field[]>(); + private static final Field[] FAILURE = new Field[0]; + + @Override + public AbstractAioChannel findChannel(Runnable command) throws Exception { + Class commandType = command.getClass(); + Field[] fields = fieldCache.get(commandType); + if (fields == null) { + try { + fields = findFieldSequence(command, new ArrayDeque(2)); + } catch (Throwable t) { + // Failed to get the field list + } + + if (fields == null) { + fields = FAILURE; + } + + fieldCache.put(commandType, fields); // No need to use putIfAbsent() + } + + if (fields == FAILURE) { + return null; + } + + final int lastIndex = fields.length - 1; + for (int i = 0; i < lastIndex; i ++) { + command = (Runnable) get(fields[i], command); + } + + return (AbstractAioChannel) get(fields[lastIndex], command); + } + + private Field[] findFieldSequence(Runnable command, Deque fields) throws Exception { + Class commandType = command.getClass(); + for (Field f: commandType.getDeclaredFields()) { + if (f.getType() == Runnable.class) { + f.setAccessible(true); + fields.addLast(f); + try { + Field[] ret = findFieldSequence((Runnable) get(f, command), fields); + if (ret != null) { + return ret; + } + } finally { + fields.removeLast(); + } + } + + if (f.getType() == Object.class) { + f.setAccessible(true); + fields.addLast(f); + try { + Object candidate = get(f, command); + if (candidate instanceof AbstractAioChannel) { + return fields.toArray(new Field[fields.size()]); + } + } finally { + fields.removeLast(); + } + } + } + + return null; + } + + protected Object get(Field f, Object command) throws Exception { + return f.get(command); + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/UnsafeAioChannelFinder.java b/transport/src/main/java/io/netty/channel/socket/aio/UnsafeAioChannelFinder.java new file mode 100644 index 0000000000..b3e3f38061 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/UnsafeAioChannelFinder.java @@ -0,0 +1,28 @@ +package io.netty.channel.socket.aio; + +import java.lang.reflect.Field; + +import sun.misc.Unsafe; + +@SuppressWarnings("restriction") +class UnsafeAioChannelFinder extends DefaultAioChannelFinder { + private static final Unsafe UNSAFE = getUnsafe(); + + @Override + protected Object get(Field f, Object command) throws Exception { + // using Unsafe to directly access the field. This should be + // faster then "pure" reflection + long offset = UNSAFE.objectFieldOffset(f); + return UNSAFE.getObject(command, offset); + } + + private static Unsafe getUnsafe() { + try { + Field singleoneInstanceField = Unsafe.class.getDeclaredField("theUnsafe"); + singleoneInstanceField.setAccessible(true); + return (Unsafe) singleoneInstanceField.get(null); + } catch (Throwable cause) { + throw new RuntimeException("Error while obtaining sun.misc.Unsafe", cause); + } + } +}