[#662] Fix race in AioEventLoopGroup

- Ensure the event loop threads are never terminated before all tasks
  submitted by JDK are executed
- Close all open connections before terminating an event loop
This commit is contained in:
Trustin Lee 2012-10-23 15:07:13 -07:00
parent 5d51aed846
commit c38c1d0e6f
2 changed files with 87 additions and 9 deletions

View File

@ -15,17 +15,57 @@
*/
package io.netty.channel.socket.aio;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.channel.SingleThreadEventLoop;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
final class AioEventLoop extends SingleThreadEventLoop {
private final Set<Channel> channels = Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>());
private final ChannelFutureListener registrationListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
return;
}
Channel ch = future.channel();
channels.add(ch);
ch.closeFuture().addListener(deregistrationListener);
}
};
private final ChannelFutureListener deregistrationListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channels.remove(future.channel());
}
};
AioEventLoop(AioEventLoopGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
}
@Override
public ChannelFuture register(Channel channel) {
return super.register(channel).addListener(registrationListener);
}
@Override
public ChannelFuture register(Channel channel, ChannelFuture future) {
return super.register(channel, future).addListener(registrationListener);
}
@Override
protected void run() {
for (;;) {
@ -38,6 +78,7 @@ final class AioEventLoop extends SingleThreadEventLoop {
}
if (isShutdown()) {
closeAll();
task = pollTask();
if (task == null) {
break;
@ -47,6 +88,17 @@ final class AioEventLoop extends SingleThreadEventLoop {
}
}
private void closeAll() {
Collection<Channel> channels = new ArrayList<Channel>(this.channels.size());
for (Channel ch: this.channels) {
channels.add(ch);
}
for (Channel ch: channels) {
ch.unsafe().close(ch.unsafe().voidFuture());
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && isShutdown()) {

View File

@ -28,6 +28,7 @@ import java.nio.channels.AsynchronousChannelGroup;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -52,6 +53,7 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
CHANNEL_FINDER = finder;
}
private final AioExecutorService groupExecutor = new AioExecutorService();
final AsynchronousChannelGroup group;
public AioEventLoopGroup() {
@ -65,7 +67,7 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
public AioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
try {
group = AsynchronousChannelGroup.withThreadPool(new AioExecutorService());
group = AsynchronousChannelGroup.withThreadPool(groupExecutor);
} catch (IOException e) {
throw new EventLoopException("Failed to create an AsynchronousChannelGroup", e);
}
@ -73,9 +75,29 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
public void shutdown() {
super.shutdown();
// Also shutdown the underlying AsynchrounsChannelGroup
boolean interrupted = false;
// Tell JDK not to accept any more registration request. Note that the threads are not really shut down yet.
group.shutdown();
// Wait until JDK propagates the shutdown request on AsynchronousChannelGroup to the ExecutorService.
// JDK will probably submit some final tasks to the ExecutorService before shutting down the ExecutorService,
// so we have to ensure all tasks submitted by JDK are executed before calling super.shutdown() to really
// shut down event loop threads.
while (!groupExecutor.isTerminated()) {
try {
groupExecutor.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
interrupted = true;
}
}
// Close all connections and shut down event loop threads.
super.shutdown();
if (interrupted) {
Thread.currentThread().interrupt();
}
}
@Override
@ -108,30 +130,34 @@ public class AioEventLoopGroup extends MultithreadEventLoopGroup {
private final class AioExecutorService extends AbstractExecutorService {
// It does not shut down the underlying EventExecutor - it merely pretends to be shut down.
// The actual shut down is done by EventLoopGroup and EventLoop implementation.
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void shutdown() {
AioEventLoopGroup.this.shutdown();
latch.countDown();
}
@Override
public List<Runnable> shutdownNow() {
AioEventLoopGroup.this.shutdown();
shutdown();
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return AioEventLoopGroup.this.isShutdown();
return latch.getCount() == 0;
}
@Override
public boolean isTerminated() {
return AioEventLoopGroup.this.isTerminated();
return isShutdown();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return AioEventLoopGroup.this.awaitTermination(timeout, unit);
return latch.await(timeout, unit);
}
@Override