Use Unsafe when possible to access AbstractAioChannel to prevent slow Reflection usage. This is kind of related to #528
This commit is contained in:
parent
65f8fbb82c
commit
967b5424c5
@ -19,6 +19,9 @@ import io.netty.channel.EventExecutor;
|
||||
import io.netty.channel.EventLoopException;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.ChannelTaskScheduler;
|
||||
import io.netty.logging.InternalLogger;
|
||||
import io.netty.logging.InternalLoggerFactory;
|
||||
import io.netty.util.internal.DetectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
@ -33,10 +36,29 @@ import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class AioEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
private static final ConcurrentMap<Class<?>, Field[]> fieldCache = new ConcurrentHashMap<Class<?>, Field[]>();
|
||||
private static final Field[] FAILURE = new Field[0];
|
||||
public class AioEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(AioEventLoopGroup.class);
|
||||
private static final ChannelFinder CHANNEL_FINDER;
|
||||
|
||||
static {
|
||||
ChannelFinder finder;
|
||||
try {
|
||||
// check if Unsafe is present on the classpath
|
||||
// and if so try to instance the UnsafeChannelFinder
|
||||
// too get the optimal speed.
|
||||
if (DetectionUtil.hasUnsafe()) {
|
||||
finder = new UnsafeChannelFinder();
|
||||
} else {
|
||||
finder = new ReflectionChannelFinder();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOGGER.debug("Unable to instance UnsafeChannelFinder fallback to ReflectionChannelFinder", t);
|
||||
finder = new ReflectionChannelFinder();
|
||||
}
|
||||
CHANNEL_FINDER = finder;
|
||||
}
|
||||
|
||||
final AsynchronousChannelGroup group;
|
||||
|
||||
@ -66,7 +88,7 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
private void executeAioTask(Runnable command) {
|
||||
AbstractAioChannel ch = null;
|
||||
try {
|
||||
ch = findChannel(command);
|
||||
ch = CHANNEL_FINDER.findChannel(command);
|
||||
} catch (Throwable t) {
|
||||
// Ignore
|
||||
}
|
||||
@ -85,68 +107,6 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
}
|
||||
}
|
||||
|
||||
private static AbstractAioChannel findChannel(Runnable command) throws Exception {
|
||||
Class<?> commandType = command.getClass();
|
||||
Field[] fields = fieldCache.get(commandType);
|
||||
if (fields == null) {
|
||||
try {
|
||||
fields = findFieldSequence(command, new ArrayDeque<Field>(2));
|
||||
} catch (Throwable t) {
|
||||
// Failed to get the field list
|
||||
}
|
||||
|
||||
if (fields == null) {
|
||||
fields = FAILURE;
|
||||
}
|
||||
|
||||
fieldCache.put(commandType, fields); // No need to use putIfAbsent()
|
||||
}
|
||||
|
||||
if (fields == FAILURE) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final int lastIndex = fields.length - 1;
|
||||
for (int i = 0; i < lastIndex; i ++) {
|
||||
command = (Runnable) fields[i].get(command);
|
||||
}
|
||||
|
||||
return (AbstractAioChannel) fields[lastIndex].get(command);
|
||||
}
|
||||
|
||||
private static Field[] findFieldSequence(Runnable command, Deque<Field> fields) throws Exception {
|
||||
Class<?> commandType = command.getClass();
|
||||
for (Field f: commandType.getDeclaredFields()) {
|
||||
if (f.getType() == Runnable.class) {
|
||||
f.setAccessible(true);
|
||||
fields.addLast(f);
|
||||
try {
|
||||
Field[] ret = findFieldSequence((Runnable) f.get(command), fields);
|
||||
if (ret != null) {
|
||||
return ret;
|
||||
}
|
||||
} finally {
|
||||
fields.removeLast();
|
||||
}
|
||||
}
|
||||
|
||||
if (f.getType() == Object.class) {
|
||||
f.setAccessible(true);
|
||||
fields.addLast(f);
|
||||
try {
|
||||
Object candidate = f.get(command);
|
||||
if (candidate instanceof AbstractAioChannel) {
|
||||
return fields.toArray(new Field[fields.size()]);
|
||||
}
|
||||
} finally {
|
||||
fields.removeLast();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private final class AioExecutorService extends AbstractExecutorService {
|
||||
|
||||
@Override
|
||||
@ -185,4 +145,102 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface ChannelFinder {
|
||||
AbstractAioChannel findChannel(Runnable command) throws Exception;
|
||||
}
|
||||
|
||||
static class ReflectionChannelFinder implements ChannelFinder {
|
||||
private static final ConcurrentMap<Class<?>, Field[]> fieldCache = new ConcurrentHashMap<Class<?>, Field[]>();
|
||||
private static final Field[] FAILURE = new Field[0];
|
||||
|
||||
@Override
|
||||
public AbstractAioChannel findChannel(Runnable command) throws Exception {
|
||||
Class<?> commandType = command.getClass();
|
||||
Field[] fields = fieldCache.get(commandType);
|
||||
if (fields == null) {
|
||||
try {
|
||||
fields = findFieldSequence(command, new ArrayDeque<Field>(2));
|
||||
} catch (Throwable t) {
|
||||
// Failed to get the field list
|
||||
}
|
||||
|
||||
if (fields == null) {
|
||||
fields = FAILURE;
|
||||
}
|
||||
|
||||
fieldCache.put(commandType, fields); // No need to use putIfAbsent()
|
||||
}
|
||||
|
||||
if (fields == FAILURE) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final int lastIndex = fields.length - 1;
|
||||
for (int i = 0; i < lastIndex; i ++) {
|
||||
command = (Runnable) get(fields[i], command);
|
||||
}
|
||||
|
||||
return (AbstractAioChannel) get(fields[lastIndex], command);
|
||||
}
|
||||
|
||||
private Field[] findFieldSequence(Runnable command, Deque<Field> fields) throws Exception {
|
||||
Class<?> commandType = command.getClass();
|
||||
for (Field f: commandType.getDeclaredFields()) {
|
||||
if (f.getType() == Runnable.class) {
|
||||
f.setAccessible(true);
|
||||
fields.addLast(f);
|
||||
try {
|
||||
Field[] ret = findFieldSequence((Runnable) get(f, command), fields);
|
||||
if (ret != null) {
|
||||
return ret;
|
||||
}
|
||||
} finally {
|
||||
fields.removeLast();
|
||||
}
|
||||
}
|
||||
|
||||
if (f.getType() == Object.class) {
|
||||
f.setAccessible(true);
|
||||
fields.addLast(f);
|
||||
try {
|
||||
Object candidate = get(f, command);
|
||||
if (candidate instanceof AbstractAioChannel) {
|
||||
return fields.toArray(new Field[fields.size()]);
|
||||
}
|
||||
} finally {
|
||||
fields.removeLast();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
protected Object get(Field f, Object command) throws Exception {
|
||||
return f.get(command);
|
||||
}
|
||||
}
|
||||
|
||||
static class UnsafeChannelFinder extends ReflectionChannelFinder {
|
||||
private static final Unsafe UNSAFE = getUnsafe();
|
||||
|
||||
@Override
|
||||
protected Object get(Field f, Object command) throws Exception {
|
||||
// using Unsafe to directly access the field. This should be
|
||||
// faster then "pure" reflection
|
||||
long offset = UNSAFE.objectFieldOffset(f);
|
||||
return UNSAFE.getObject(command, offset);
|
||||
}
|
||||
|
||||
private static Unsafe getUnsafe() {
|
||||
try {
|
||||
Field singleoneInstanceField = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
singleoneInstanceField.setAccessible(true);
|
||||
return (Unsafe) singleoneInstanceField.get(null);
|
||||
} catch (Throwable cause) {
|
||||
throw new RuntimeException("Error while obtaining sun.misc.Unsafe", cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user