From caf91b9c0687bedab40ba06f596b0c2c2f1ae7e3 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 20 Aug 2013 14:37:57 +0900 Subject: [PATCH] Fix IllegalStateException triggered while shutting down ThreadPerChannelEventLoopGroup - Fix #1718 - Add the test case contributed by @mkw --- .../ThreadPerChannelEventLoopGroup.java | 2 +- .../ThreadPerChannelEventLoopGroupTest.java | 114 ++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) create mode 100644 transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java index b0a32f42ba..0d3455b98f 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoopGroup.java @@ -57,7 +57,7 @@ public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup i public void operationComplete(Future future) throws Exception { // Inefficient, but works. if (isTerminated()) { - terminationFuture.setSuccess(null); + terminationFuture.trySuccess(null); } } }; diff --git a/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java b/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java new file mode 100644 index 0000000000..f992d8727b --- /dev/null +++ b/transport/src/test/java/io/netty/channel/ThreadPerChannelEventLoopGroupTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel; + +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.SingleThreadEventExecutor; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class ThreadPerChannelEventLoopGroupTest { + + private static final ChannelHandler NOOP_HANDLER = new ChannelHandlerAdapter() { + @Override + public boolean isSharable() { + return true; + } + }; + + @Test + public void testTerminationFutureSuccessInLog() throws Exception { + for (int i = 0; i < 2; i++) { + ThreadPerChannelEventLoopGroup loopGroup = new ThreadPerChannelEventLoopGroup(64); + runTest(loopGroup); + } + } + + @Test + public void testTerminationFutureSuccessReflectively() throws Exception { + Field terminationFutureField = + ThreadPerChannelEventLoopGroup.class.getDeclaredField("terminationFuture"); + terminationFutureField.setAccessible(true); + final Exception[] exceptionHolder = new Exception[1]; + for (int i = 0; i < 2; i++) { + ThreadPerChannelEventLoopGroup loopGroup = new ThreadPerChannelEventLoopGroup(64); + Promise promise = new DefaultPromise(GlobalEventExecutor.INSTANCE) { + @Override + public Promise setSuccess(Void result) { + try { + return super.setSuccess(result); + } catch (IllegalStateException e) { + exceptionHolder[0] = e; + throw e; + } + } + }; + terminationFutureField.set(loopGroup, promise); + runTest(loopGroup); + } + // The global event executor will not terminate, but this will give the test a chance to fail. + GlobalEventExecutor.INSTANCE.awaitTermination(100, TimeUnit.MILLISECONDS); + assertNull(exceptionHolder[0]); + } + + private static void runTest(ThreadPerChannelEventLoopGroup loopGroup) throws InterruptedException { + int taskCount = 100; + EventExecutor testExecutor = new TestEventExecutor(); + ChannelGroup channelGroup = new DefaultChannelGroup(testExecutor); + while (taskCount-- > 0) { + Channel channel = new EmbeddedChannel(NOOP_HANDLER); + loopGroup.register(channel, new DefaultChannelPromise(channel, testExecutor)); + channelGroup.add(channel); + } + channelGroup.close().sync(); + loopGroup.shutdownGracefully(100, 200, TimeUnit.MILLISECONDS).sync(); + assertTrue(loopGroup.isTerminated()); + } + + private static class TestEventExecutor extends SingleThreadEventExecutor { + + public TestEventExecutor() { + super(null, new DefaultThreadFactory("test"), false); + } + + @Override + protected void run() { + for (; ; ) { + Runnable task = takeTask(); + if (task != null) { + task.run(); + updateLastExecutionTime(); + } + + if (confirmShutdown()) { + break; + } + } + } + } +}