diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index f5732067ae..13bbfde258 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -442,7 +442,7 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im } protected void wakeup(boolean inEventLoop) { - if (!inEventLoop || state == ST_SHUTTING_DOWN) { + if (!inEventLoop) { // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there // is already something in the queue. taskQueue.offer(WAKEUP_TASK); @@ -551,7 +551,10 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im } if (wakeup) { - wakeup(inEventLoop); + taskQueue.offer(WAKEUP_TASK); + if (!addTaskWakesUp) { + wakeup(inEventLoop); + } } return terminationFuture(); @@ -603,7 +606,10 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im } if (wakeup) { - wakeup(inEventLoop); + taskQueue.offer(WAKEUP_TASK); + if (!addTaskWakesUp) { + wakeup(inEventLoop); + } } } @@ -652,7 +658,7 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im if (gracefulShutdownQuietPeriod == 0) { return true; } - wakeup(true); + taskQueue.offer(WAKEUP_TASK); return false; } @@ -665,7 +671,7 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. - wakeup(true); + taskQueue.offer(WAKEUP_TASK); try { Thread.sleep(100); } catch (InterruptedException e) { diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java new file mode 100644 index 0000000000..4b7ef60383 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java @@ -0,0 +1,124 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.MultithreadEventLoopGroup; +import org.junit.Test; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalServerChannel; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; + +public abstract class AbstractSingleThreadEventLoopTest { + + @Test + @SuppressWarnings("deprecation") + public void shutdownBeforeStart() throws Exception { + EventLoopGroup group = new MultithreadEventLoopGroup(newIoHandlerFactory()); + assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS)); + group.shutdown(); + assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS)); + } + + @Test + public void shutdownGracefullyZeroQuietBeforeStart() throws Exception { + EventLoopGroup group = new MultithreadEventLoopGroup(newIoHandlerFactory()); + assertTrue(group.shutdownGracefully(0L, 2L, TimeUnit.SECONDS).await(200L)); + } + + // Copied from AbstractEventLoopTest + @Test(timeout = 5000) + public void testShutdownGracefullyNoQuietPeriod() throws Exception { + EventLoopGroup loop = new MultithreadEventLoopGroup(newIoHandlerFactory()); + ServerBootstrap b = new ServerBootstrap(); + b.group(loop) + .channel(serverChannelClass()) + .childHandler(new ChannelHandler() { }); + + // Not close the Channel to ensure the EventLoop is still shutdown in time. + ChannelFuture cf = serverChannelClass() == LocalServerChannel.class + ? b.bind(new LocalAddress("local")) : b.bind(0); + cf.sync().channel(); + + Future f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES); + assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS)); + assertTrue(f.syncUninterruptibly().isSuccess()); + assertTrue(loop.isShutdown()); + assertTrue(loop.isTerminated()); + } + + @Test + public void shutdownGracefullyBeforeStart() throws Exception { + EventLoopGroup group = new MultithreadEventLoopGroup(newIoHandlerFactory()); + assertTrue(group.shutdownGracefully(200L, 1000L, TimeUnit.MILLISECONDS).await(500L)); + } + + @Test + public void gracefulShutdownAfterStart() throws Exception { + EventLoop loop = new MultithreadEventLoopGroup(newIoHandlerFactory()).next(); + final CountDownLatch latch = new CountDownLatch(1); + loop.execute(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); + + // Wait for the event loop thread to start. + latch.await(); + + // Request the event loop thread to stop. + loop.shutdownGracefully(200L, 3000L, TimeUnit.MILLISECONDS); + + // Wait until the event loop is terminated. + assertTrue(loop.awaitTermination(500L, TimeUnit.MILLISECONDS)); + + assertRejection(loop); + } + + private static final Runnable NOOP = new Runnable() { + @Override + public void run() { } + }; + + private static void assertRejection(EventExecutor loop) { + try { + loop.execute(NOOP); + fail("A task must be rejected after shutdown() is called."); + } catch (RejectedExecutionException e) { + // Expected + } + } + + protected abstract IoHandlerFactory newIoHandlerFactory(); + protected abstract Class serverChannelClass(); +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/LocalSingleThreadEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/LocalSingleThreadEventLoopTest.java new file mode 100644 index 0000000000..556b937866 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/LocalSingleThreadEventLoopTest.java @@ -0,0 +1,33 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport; + +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.ServerChannel; +import io.netty.channel.local.LocalHandler; +import io.netty.channel.local.LocalServerChannel; + +public class LocalSingleThreadEventLoopTest extends AbstractSingleThreadEventLoopTest { + @Override + protected IoHandlerFactory newIoHandlerFactory() { + return LocalHandler.newFactory(); + } + + @Override + protected Class serverChannelClass() { + return LocalServerChannel.class; + } +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/NioSingleThreadEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/NioSingleThreadEventLoopTest.java new file mode 100644 index 0000000000..ff1154bd43 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/NioSingleThreadEventLoopTest.java @@ -0,0 +1,33 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport; + +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.ServerChannel; +import io.netty.channel.nio.NioHandler; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +public class NioSingleThreadEventLoopTest extends AbstractSingleThreadEventLoopTest { + @Override + protected IoHandlerFactory newIoHandlerFactory() { + return NioHandler.newFactory(); + } + + @Override + protected Class serverChannelClass() { + return NioServerSocketChannel.class; + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java index e2da34b61c..1aef16f893 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java @@ -18,8 +18,11 @@ package io.netty.channel.epoll; import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.ServerChannel; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.unix.FileDescriptor; +import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ThreadPerTaskExecutor; @@ -35,7 +38,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class EpollEventLoopTest { +public class EpollEventLoopTest extends AbstractSingleThreadEventLoopTest { @Test public void testScheduleBigDelayNotOverflow() { @@ -111,4 +114,14 @@ public class EpollEventLoopTest { timerFd.close(); } } + + @Override + protected IoHandlerFactory newIoHandlerFactory() { + return EpollHandler.newFactory(); + } + + @Override + protected Class serverChannelClass() { + return EpollServerSocketChannel.class; + } } diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java index b8ce7e3514..bae1f2c8a7 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java @@ -17,7 +17,10 @@ package io.netty.channel.kqueue; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.IoHandlerFactory; import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest; import io.netty.util.concurrent.Future; import org.junit.Test; @@ -26,11 +29,11 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class KQueueEventLoopTest { +public class KQueueEventLoopTest extends AbstractSingleThreadEventLoopTest { @Test public void testScheduleBigDelayNotOverflow() { - EventLoopGroup group = new MultithreadEventLoopGroup(1, KQueueHandler.newFactory()); + EventLoopGroup group = new MultithreadEventLoopGroup(1, newIoHandlerFactory()); final EventLoop el = group.next(); Future future = el.schedule(() -> { @@ -41,4 +44,14 @@ public class KQueueEventLoopTest { assertTrue(future.cancel(true)); group.shutdownGracefully(); } + + @Override + protected IoHandlerFactory newIoHandlerFactory() { + return KQueueHandler.newFactory(); + } + + @Override + protected Class serverChannelClass() { + return KQueueServerSocketChannel.class; + } } diff --git a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java index 31e5c759fe..6a67833f84 100644 --- a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java @@ -38,7 +38,7 @@ public abstract class AbstractEventLoopTest { b.bind(0).sync().channel(); Future f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES); - assertTrue(loop.awaitTermination(2, TimeUnit.SECONDS)); + assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS)); assertTrue(f.syncUninterruptibly().isSuccess()); assertTrue(loop.isShutdown()); assertTrue(loop.isTerminated());