Made the AIO transport faster / Fixed a bug in SingleThreadEventLoopTest

- Used reflection hack to dispatch the tasks submitted by JDK
  efficiently.  Without hack, there's higher chance of additional
  context switches.
- Server side performance improved to the expected level.
- Client side performance issue still under investigation
This commit is contained in:
Trustin Lee 2012-07-08 21:28:56 +09:00
parent 220e4e886f
commit c77f107f5f
6 changed files with 89 additions and 106 deletions

View File

@ -17,7 +17,6 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.aio.AioEventLoop;
import io.netty.channel.socket.aio.AioServerSocketChannel;
@ -55,10 +54,10 @@ final class SocketTestPermutation {
sbfs.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
EventLoop loop = new AioEventLoop();
AioEventLoop loop = new AioEventLoop();
return new ServerBootstrap().
eventLoop(loop, loop).
channel(new AioServerSocketChannel());
channel(new AioServerSocketChannel(loop));
}
});
sbfs.add(new Factory<ServerBootstrap>() {
@ -82,7 +81,8 @@ final class SocketTestPermutation {
cbfs.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().eventLoop(new AioEventLoop()).channel(new AioSocketChannel());
AioEventLoop loop = new AioEventLoop();
return new Bootstrap().eventLoop(loop).channel(new AioSocketChannel(loop));
}
});
cbfs.add(new Factory<Bootstrap>() {

View File

@ -16,12 +16,18 @@
package io.netty.channel.socket.aio;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoopException;
import io.netty.channel.MultithreadEventLoop;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.concurrent.ThreadFactory;
public class AioEventLoop extends MultithreadEventLoop {
final AsynchronousChannelGroup group;
public AioEventLoop() {
this(0);
}
@ -32,6 +38,66 @@ public class AioEventLoop extends MultithreadEventLoop {
public AioEventLoop(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
try {
group = AsynchronousChannelGroup.withThreadPool(this);
} catch (IOException e) {
throw new EventLoopException("Failed to create an AsynchronousChannelGroup", e);
}
}
@Override
public void execute(Runnable command) {
Class<? extends Runnable> commandType = command.getClass();
if (commandType.getName().startsWith("sun.nio.ch.")) {
executeAioTask(command);
} else {
super.execute(command);
}
}
private void executeAioTask(Runnable command) {
AbstractAioChannel ch = null;
try {
ch = findChannel(command);
} catch (Exception e) {
// Ignore
}
EventExecutor l;
if (ch != null) {
l = ch.eventLoop();
} else {
l = unsafe().nextChild();
}
if (l.isShutdown()) {
command.run();
} else {
ch.eventLoop().execute(command);
}
}
private static AbstractAioChannel findChannel(Runnable command) throws Exception {
// TODO: Optimize me
Class<?> commandType = command.getClass();
for (Field f: commandType.getDeclaredFields()) {
if (f.getType() == Runnable.class) {
f.setAccessible(true);
AbstractAioChannel ch = findChannel((Runnable) f.get(command));
if (ch != null) {
return ch;
}
}
if (f.getType() == Object.class) {
f.setAccessible(true);
Object candidate = f.get(command);
if (candidate instanceof AbstractAioChannel) {
return (AbstractAioChannel) candidate;
}
}
}
return null;
}
@Override

View File

@ -1,76 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.aio;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;
final class AioGroup {
static final AsynchronousChannelGroup GROUP;
static {
AsynchronousChannelGroup group;
try {
group = AsynchronousChannelGroup.withThreadPool(new AioGroupExecutor());
} catch (Exception e) {
throw new Error(e);
}
GROUP = group;
}
private AioGroup() {
// Unused
}
static final class AioGroupExecutor extends AbstractExecutorService {
@Override
public void shutdown() {
// Unstoppable
}
@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
Thread.sleep(unit.toMillis(timeout));
return false;
}
@Override
public void execute(Runnable command) {
command.run();
}
}
}

View File

@ -26,6 +26,7 @@ import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
@ -41,16 +42,16 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
private final AioServerSocketChannelConfig config;
private boolean closed;
private static AsynchronousServerSocketChannel newSocket() {
private static AsynchronousServerSocketChannel newSocket(AsynchronousChannelGroup group) {
try {
return AsynchronousServerSocketChannel.open(AioGroup.GROUP);
return AsynchronousServerSocketChannel.open(group);
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
public AioServerSocketChannel() {
super(null, null, newSocket());
public AioServerSocketChannel(AioEventLoop eventLoop) {
super(null, null, newSocket(eventLoop.group));
config = new AioServerSocketChannelConfig(javaChannel());
}

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
@ -40,9 +41,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler();
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler();
private static AsynchronousSocketChannel newSocket() {
private static AsynchronousSocketChannel newSocket(AsynchronousChannelGroup group) {
try {
return AsynchronousSocketChannel.open(AioGroup.GROUP);
return AsynchronousSocketChannel.open(group);
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
@ -67,8 +68,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
private final AioSocketChannelConfig config;
private boolean flushing;
public AioSocketChannel() {
this(null, null, newSocket());
public AioSocketChannel(AioEventLoop eventLoop) {
this(null, null, newSocket(eventLoop.group));
}
AioSocketChannel(AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel ch) {
@ -204,7 +205,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (buf.readable()) {
try {
// try to flush it again if nothing is left it will return fast here
// Try to flush it again.
channel.doFlushByteBuffer(buf);
} catch (Exception e) {
// Should never happen, anyway call failed just in case
@ -252,7 +253,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} else if (localReadAmount < 0) {
closed = true;
}
} catch (Throwable t) {
if (read) {
read = false;
@ -274,6 +274,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.unsafe().close(channel.unsafe().voidFuture());
} else {
// start the next read
//channel.readTask.run();
channel.eventLoop().execute(channel.readTask);
}
}
@ -300,11 +301,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void completed0(Void result, AioSocketChannel channel) {
channel.readTask.run();
((AsyncUnsafe) channel.unsafe()).connectSuccess();
channel.pipeline().fireChannelActive();
// start reading from channel
channel.eventLoop().execute(channel.readTask);
}
@Override

View File

@ -63,32 +63,25 @@ public class SingleThreadEventLoopTest {
@Test
public void shutdownAfterStart() throws Exception {
final AtomicBoolean interrupted = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(2);
final CountDownLatch latch = new CountDownLatch(1);
loop.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
while (latch.getCount() > 0) {
try {
latch.await();
} catch (InterruptedException ignored) {
interrupted.set(true);
}
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ignored) {
interrupted.set(true);
}
}
});
// Wait for the event loop thread to start.
while (latch.getCount() >= 2) {
Thread.yield();
}
latch.await();
// Request the event loop thread to stop - it will call wakeup(false) to interrupt the thread.
loop.shutdown();
// Make the task terminate by itself.
latch.countDown();
// Wait until the event loop is terminated.
while (!loop.isTerminated()) {
loop.awaitTermination(1, TimeUnit.DAYS);