From 967b5424c579964512092426812a7c6b2001b9fd Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 25 Aug 2012 22:50:31 +0200 Subject: [PATCH] Use Unsafe when possible to access AbstractAioChannel to prevent slow Reflection usage. This is kind of related to #528 --- .../channel/socket/aio/AioEventLoopGroup.java | 190 ++++++++++++------ 1 file changed, 124 insertions(+), 66 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java index f73f847e2d..2c77faf446 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoopGroup.java @@ -19,6 +19,9 @@ import io.netty.channel.EventExecutor; import io.netty.channel.EventLoopException; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.ChannelTaskScheduler; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; +import io.netty.util.internal.DetectionUtil; import java.io.IOException; import java.lang.reflect.Field; @@ -33,10 +36,29 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -public class AioEventLoopGroup extends MultithreadEventLoopGroup { +import sun.misc.Unsafe; - private static final ConcurrentMap, Field[]> fieldCache = new ConcurrentHashMap, Field[]>(); - private static final Field[] FAILURE = new Field[0]; +public class AioEventLoopGroup extends MultithreadEventLoopGroup { + private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(AioEventLoopGroup.class); + private static final ChannelFinder CHANNEL_FINDER; + + static { + ChannelFinder finder; + try { + // check if Unsafe is present on the classpath + // and if so try to instance the UnsafeChannelFinder + // too get the optimal speed. + if (DetectionUtil.hasUnsafe()) { + finder = new UnsafeChannelFinder(); + } else { + finder = new ReflectionChannelFinder(); + } + } catch (Throwable t) { + LOGGER.debug("Unable to instance UnsafeChannelFinder fallback to ReflectionChannelFinder", t); + finder = new ReflectionChannelFinder(); + } + CHANNEL_FINDER = finder; + } final AsynchronousChannelGroup group; @@ -66,7 +88,7 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup { private void executeAioTask(Runnable command) { AbstractAioChannel ch = null; try { - ch = findChannel(command); + ch = CHANNEL_FINDER.findChannel(command); } catch (Throwable t) { // Ignore } @@ -85,68 +107,6 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup { } } - private static AbstractAioChannel findChannel(Runnable command) throws Exception { - Class commandType = command.getClass(); - Field[] fields = fieldCache.get(commandType); - if (fields == null) { - try { - fields = findFieldSequence(command, new ArrayDeque(2)); - } catch (Throwable t) { - // Failed to get the field list - } - - if (fields == null) { - fields = FAILURE; - } - - fieldCache.put(commandType, fields); // No need to use putIfAbsent() - } - - if (fields == FAILURE) { - return null; - } - - final int lastIndex = fields.length - 1; - for (int i = 0; i < lastIndex; i ++) { - command = (Runnable) fields[i].get(command); - } - - return (AbstractAioChannel) fields[lastIndex].get(command); - } - - private static Field[] findFieldSequence(Runnable command, Deque fields) throws Exception { - Class commandType = command.getClass(); - for (Field f: commandType.getDeclaredFields()) { - if (f.getType() == Runnable.class) { - f.setAccessible(true); - fields.addLast(f); - try { - Field[] ret = findFieldSequence((Runnable) f.get(command), fields); - if (ret != null) { - return ret; - } - } finally { - fields.removeLast(); - } - } - - if (f.getType() == Object.class) { - f.setAccessible(true); - fields.addLast(f); - try { - Object candidate = f.get(command); - if (candidate instanceof AbstractAioChannel) { - return fields.toArray(new Field[fields.size()]); - } - } finally { - fields.removeLast(); - } - } - } - - return null; - } - private final class AioExecutorService extends AbstractExecutorService { @Override @@ -185,4 +145,102 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup { } } } + + interface ChannelFinder { + AbstractAioChannel findChannel(Runnable command) throws Exception; + } + + static class ReflectionChannelFinder implements ChannelFinder { + private static final ConcurrentMap, Field[]> fieldCache = new ConcurrentHashMap, Field[]>(); + private static final Field[] FAILURE = new Field[0]; + + @Override + public AbstractAioChannel findChannel(Runnable command) throws Exception { + Class commandType = command.getClass(); + Field[] fields = fieldCache.get(commandType); + if (fields == null) { + try { + fields = findFieldSequence(command, new ArrayDeque(2)); + } catch (Throwable t) { + // Failed to get the field list + } + + if (fields == null) { + fields = FAILURE; + } + + fieldCache.put(commandType, fields); // No need to use putIfAbsent() + } + + if (fields == FAILURE) { + return null; + } + + final int lastIndex = fields.length - 1; + for (int i = 0; i < lastIndex; i ++) { + command = (Runnable) get(fields[i], command); + } + + return (AbstractAioChannel) get(fields[lastIndex], command); + } + + private Field[] findFieldSequence(Runnable command, Deque fields) throws Exception { + Class commandType = command.getClass(); + for (Field f: commandType.getDeclaredFields()) { + if (f.getType() == Runnable.class) { + f.setAccessible(true); + fields.addLast(f); + try { + Field[] ret = findFieldSequence((Runnable) get(f, command), fields); + if (ret != null) { + return ret; + } + } finally { + fields.removeLast(); + } + } + + if (f.getType() == Object.class) { + f.setAccessible(true); + fields.addLast(f); + try { + Object candidate = get(f, command); + if (candidate instanceof AbstractAioChannel) { + return fields.toArray(new Field[fields.size()]); + } + } finally { + fields.removeLast(); + } + } + } + + return null; + } + + protected Object get(Field f, Object command) throws Exception { + return f.get(command); + } + } + + static class UnsafeChannelFinder extends ReflectionChannelFinder { + private static final Unsafe UNSAFE = getUnsafe(); + + @Override + protected Object get(Field f, Object command) throws Exception { + // using Unsafe to directly access the field. This should be + // faster then "pure" reflection + long offset = UNSAFE.objectFieldOffset(f); + return UNSAFE.getObject(command, offset); + } + + private static Unsafe getUnsafe() { + try { + Field singleoneInstanceField = Unsafe.class.getDeclaredField("theUnsafe"); + singleoneInstanceField.setAccessible(true); + return (Unsafe) singleoneInstanceField.get(null); + } catch (Throwable cause) { + throw new RuntimeException("Error while obtaining sun.misc.Unsafe", cause); + } + } + } }