Refactor the aio transport to not depend on the AioChannelFinder and so not need for refelection

This commit is contained in:
Norman Maurer 2013-02-18 14:23:02 +01:00
parent 17641d52fb
commit fada776756
9 changed files with 14 additions and 460 deletions

View File

@ -1,32 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.aio;
/**
* Allow to fine the {@link AbstractAioChannel} for a task.
*/
interface AioChannelFinder {
/**
* Try to find the {@link AbstractAioChannel} for the given {@link Runnable}.
*
* @param command the {@link Runnable} for which the {@link AbstractAioChannel} should be found.
* @return channel the {@link AbstractAioChannel} which belongs to the {@link Runnable} or {@code null} if
* it could not found.
* @throws Exception will get thrown if an error accours.
*/
AbstractAioChannel findChannel(Runnable command) throws Exception;
}

View File

@ -35,30 +35,12 @@ public abstract class AioCompletionHandler<V, A extends Channel> implements Comp
*/ */
protected abstract void failed0(Throwable exc, A channel); protected abstract void failed0(Throwable exc, A channel);
private static final int MAX_STACK_DEPTH = 4;
private static final ThreadLocal<Integer> STACK_DEPTH = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
@Override @Override
public final void completed(final V result, final A channel) { public final void completed(final V result, final A channel) {
EventLoop loop = channel.eventLoop(); EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
Integer d = STACK_DEPTH.get();
if (d < MAX_STACK_DEPTH) {
STACK_DEPTH.set(d + 1);
try {
completed0(result, channel); completed0(result, channel);
} finally { } else {
STACK_DEPTH.set(d);
}
return;
}
}
loop.execute(new Runnable() { loop.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -66,23 +48,14 @@ public abstract class AioCompletionHandler<V, A extends Channel> implements Comp
} }
}); });
} }
}
@Override @Override
public final void failed(final Throwable exc, final A channel) { public final void failed(final Throwable exc, final A channel) {
EventLoop loop = channel.eventLoop(); EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
Integer d = STACK_DEPTH.get();
if (d < MAX_STACK_DEPTH) {
STACK_DEPTH.set(d + 1);
try {
failed0(exc, channel); failed0(exc, channel);
} finally { } else {
STACK_DEPTH.set(d);
}
return;
}
}
loop.execute(new Runnable() { loop.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -90,4 +63,5 @@ public abstract class AioCompletionHandler<V, A extends Channel> implements Comp
} }
}); });
} }
}
} }

View File

@ -20,9 +20,6 @@ import io.netty.channel.ChannelTaskScheduler;
import io.netty.channel.EventExecutor; import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoopException; import io.netty.channel.EventLoopException;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.internal.InternalLogger;
import io.netty.util.internal.InternalLoggerFactory;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousChannelGroup;
@ -38,26 +35,6 @@ import java.util.concurrent.TimeUnit;
* *
*/ */
public class AioEventLoopGroup extends MultithreadEventLoopGroup { public class AioEventLoopGroup extends MultithreadEventLoopGroup {
private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(AioEventLoopGroup.class);
private static final AioChannelFinder CHANNEL_FINDER;
static {
AioChannelFinder finder;
try {
if (PlatformDependent.hasUnsafe()) {
finder = new UnsafeAioChannelFinder();
} else {
finder = new ReflectiveAioChannelFinder();
}
} catch (Throwable t) {
LOGGER.debug(String.format(
"Failed to instantiate the optimal %s implementation - falling back to %s.",
AioChannelFinder.class.getSimpleName(), ReflectiveAioChannelFinder.class.getSimpleName()), t);
finder = new ReflectiveAioChannelFinder();
}
CHANNEL_FINDER = finder;
}
private final AioExecutorService groupExecutor = new AioExecutorService(); private final AioExecutorService groupExecutor = new AioExecutorService();
private final AsynchronousChannelGroup group; private final AsynchronousChannelGroup group;
@ -135,7 +112,7 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
return new AioEventLoop(this, threadFactory, scheduler); return new AioEventLoop(this, threadFactory, scheduler);
} }
private final class AioExecutorService extends AbstractExecutorService { private static final class AioExecutorService extends AbstractExecutorService {
// It does not shut down the underlying EventExecutor - it merely pretends to be shut down. // It does not shut down the underlying EventExecutor - it merely pretends to be shut down.
// The actual shut down is done by EventLoopGroup and EventLoop implementation. // The actual shut down is done by EventLoopGroup and EventLoop implementation.
@ -169,34 +146,7 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
@Override @Override
public void execute(Runnable command) { public void execute(Runnable command) {
Class<? extends Runnable> commandType = command.getClass();
if (commandType.getName().startsWith("sun.nio.ch.")) {
executeAioTask(command);
} else {
next().execute(command);
}
}
private void executeAioTask(Runnable command) {
AbstractAioChannel ch = null;
try {
ch = CHANNEL_FINDER.findChannel(command);
} catch (Throwable t) {
// Ignore
}
EventExecutor l;
if (ch != null) {
l = ch.eventLoop();
} else {
l = next();
}
if (l.isShutdown()) {
command.run(); command.run();
} else {
l.execute(command);
}
} }
} }
} }

View File

@ -1,77 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.aio;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
/**
* {@link AioChannelFinder} implementation which use reflection for find the right {@link AbstractAioChannel}.
*/
final class ReflectiveAioChannelFinder implements AioChannelFinder {
private static volatile Map<Class<?>, Field> fieldCache = new HashMap<Class<?>, Field>();
@Override
public AbstractAioChannel findChannel(Runnable command) throws Exception {
Field f;
for (;;) {
f = findField(command);
if (f == null) {
return null;
}
Object next = f.get(command);
if (next instanceof AbstractAioChannel) {
return (AbstractAioChannel) next;
}
command = (Runnable) next;
}
}
private static Field findField(Object command) throws Exception {
Map<Class<?>, Field> fieldCache = ReflectiveAioChannelFinder.fieldCache;
Class<?> commandType = command.getClass();
Field res = fieldCache.get(commandType);
if (res != null) {
return res;
}
for (Field f: commandType.getDeclaredFields()) {
if (f.getType() == Runnable.class) {
f.setAccessible(true);
put(fieldCache, commandType, f);
return f;
}
if (f.getType() == Object.class) {
f.setAccessible(true);
Object candidate = f.get(command);
if (candidate instanceof AbstractAioChannel) {
put(fieldCache, commandType, f);
return f;
}
}
}
return null;
}
private static void put(Map<Class<?>, Field> oldCache, Class<?> key, Field value) {
Map<Class<?>, Field> newCache = new HashMap<Class<?>, Field>(oldCache.size());
newCache.putAll(oldCache);
newCache.put(key, value);
fieldCache = newCache;
}
}

View File

@ -1,80 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.aio;
import io.netty.util.internal.PlatformDependent;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
/**
* {@link AioChannelFinder} implementation which uses {@code sun.misc.Unsafe}.
*/
final class UnsafeAioChannelFinder implements AioChannelFinder {
private static volatile Map<Class<?>, Long> offsetCache = new HashMap<Class<?>, Long>();
@Override
public AbstractAioChannel findChannel(Runnable command) throws Exception {
Long offset;
for (;;) {
offset = findFieldOffset(command);
if (offset == null) {
return null;
}
Object next = PlatformDependent.getObject(command, offset);
if (next instanceof AbstractAioChannel) {
return (AbstractAioChannel) next;
}
command = (Runnable) next;
}
}
private static Long findFieldOffset(Object command) throws Exception {
Map<Class<?>, Long> offsetCache = UnsafeAioChannelFinder.offsetCache;
Class<?> commandType = command.getClass();
Long res = offsetCache.get(commandType);
if (res != null) {
return res;
}
for (Field f: commandType.getDeclaredFields()) {
if (f.getType() == Runnable.class) {
res = PlatformDependent.objectFieldOffset(f);
put(offsetCache, commandType, res);
return res;
}
if (f.getType() == Object.class) {
f.setAccessible(true);
Object candidate = f.get(command);
if (candidate instanceof AbstractAioChannel) {
res = PlatformDependent.objectFieldOffset(f);
put(offsetCache, commandType, res);
return res;
}
}
}
return null;
}
private static void put(Map<Class<?>, Long> oldCache, Class<?> key, Long value) {
Map<Class<?>, Long> newCache = new HashMap<Class<?>, Long>(oldCache.size());
newCache.putAll(oldCache);
newCache.put(key, value);
offsetCache = newCache;
}
}

View File

@ -1,98 +0,0 @@
package io.netty.channel.aio;
import static org.junit.Assert.*;
import static org.easymock.EasyMock.*;
import org.junit.Test;
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
public abstract class AbstractAioChannelFinderTest {
protected abstract AioChannelFinder create();
@Test
public void testNull() throws Exception {
AioChannelFinder finder = create();
AbstractAioChannel channel = finder.findChannel(new Runnable() {
@Override
public void run() {
// Noop
}
});
assertNull(channel);
}
@Test
public void testRunnableWrappsAbstractAioChannel() throws Exception {
final Object mockChannel = createMock("mockChannel", AbstractAioChannel.class);
replay(mockChannel);
Runnable r = new Runnable() {
@SuppressWarnings("unused")
@Override
public void run() {
Object channel = mockChannel;
// Noop
}
};
AioChannelFinder finder = create();
AbstractAioChannel channel = finder.findChannel(r);
assertNotNull(channel);
AbstractAioChannel channel2 = finder.findChannel(r);
assertNotNull(channel2);
assertSame(channel2, channel);
verify(mockChannel);
reset(mockChannel);
}
@Test
public void testRunnableWrappsRunnable() throws Exception {
final Object mockChannel = createMock("mockChannel", AbstractAioChannel.class);
replay(mockChannel);
final Runnable r = new Runnable() {
@SuppressWarnings("unused")
@Override
public void run() {
Object channel = mockChannel;
// Noop
}
};
Runnable r2 = new Runnable() {
@SuppressWarnings("unused")
@Override
public void run() {
Runnable runnable = r;
// Noop
}
};
AioChannelFinder finder = create();
AbstractAioChannel channel = finder.findChannel(r2);
assertNotNull(channel);
AbstractAioChannel channel2 = finder.findChannel(r2);
assertNotNull(channel2);
assertSame(channel2, channel);
verify(mockChannel);
reset(mockChannel);
}
}

View File

@ -1,33 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.aio;
import io.netty.channel.AbstractEventLoopTest;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.aio.AioServerSocketChannel;
public class AioEventLoopTest extends AbstractEventLoopTest {
@Override
protected EventLoopGroup newEventLoopGroup() {
return new AioEventLoopGroup();
}
@Override
protected Class<? extends ServerSocketChannel> newChannel() {
return AioServerSocketChannel.class;
}
}

View File

@ -1,25 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.aio;
public class ReflectiveAioChannelFinderTest extends AbstractAioChannelFinderTest {
@Override
protected AioChannelFinder create() {
return new ReflectiveAioChannelFinder();
}
}

View File

@ -1,25 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.aio;
public class UnsafeAioChannelFinderTest extends AbstractAioChannelFinderTest {
@Override
protected AioChannelFinder create() {
return new UnsafeAioChannelFinder();
}
}