diff --git a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java index 00ded85f8a..d43b6bf4af 100644 --- a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java +++ b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java @@ -41,7 +41,8 @@ import java.util.concurrent.atomic.AtomicBoolean; public final class ThreadDeathWatcher { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class); - private static final ThreadFactory threadFactory; + // visible for testing + static final ThreadFactory threadFactory; private static final Queue pendingEntries = PlatformDependent.newMpscQueue(); private static final Watcher watcher = new Watcher(); @@ -54,7 +55,10 @@ public final class ThreadDeathWatcher { if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) { poolName = serviceThreadPrefix + poolName; } - threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY); + // because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and + // this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory + // must not be sticky about its thread group + threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null); } /** diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java b/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java index c0c56bf1b2..42f32f663e 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java @@ -16,7 +16,6 @@ package io.netty.util.concurrent; -import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.StringUtil; import java.util.Locale; @@ -64,7 +63,7 @@ public class DefaultThreadFactory implements ThreadFactory { this(toPoolName(poolType), daemon, priority); } - private static String toPoolName(Class poolType) { + public static String toPoolName(Class poolType) { if (poolType == null) { throw new NullPointerException("poolType"); } @@ -96,11 +95,12 @@ public class DefaultThreadFactory implements ThreadFactory { prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; - this.threadGroup = ObjectUtil.checkNotNull(threadGroup, "threadGroup"); + this.threadGroup = threadGroup; } public DefaultThreadFactory(String poolName, boolean daemon, int priority) { - this(poolName, daemon, priority, Thread.currentThread().getThreadGroup()); + this(poolName, daemon, priority, System.getSecurityManager() == null ? + Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); } @Override @@ -126,7 +126,6 @@ public class DefaultThreadFactory implements ThreadFactory { return t; } - // TODO: Once we can break the API we should add ThreadGroup to the arguments of this method. protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); } diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index c035c94ee3..e1f7679e5a 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -49,7 +49,12 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor { } }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL); - private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass()); + // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this + // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not + // be sticky about its thread group + // visible for testing + final ThreadFactory threadFactory = + new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null); private final TaskRunner taskRunner = new TaskRunner(); private final AtomicBoolean started = new AtomicBoolean(); volatile Thread thread; diff --git a/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java b/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java index 9dd088bac7..7574274f64 100644 --- a/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java +++ b/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java @@ -21,9 +21,12 @@ import org.junit.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; public class ThreadDeathWatcherTest { @@ -113,4 +116,25 @@ public class ThreadDeathWatcherTest { // And the task should not run. assertThat(run.get(), is(false)); } + + @Test(timeout = 2000) + public void testThreadGroup() throws InterruptedException { + final ThreadGroup group = new ThreadGroup("group"); + final AtomicReference capturedGroup = new AtomicReference(); + final Thread thread = new Thread(group, new Runnable() { + @Override + public void run() { + final Thread t = ThreadDeathWatcher.threadFactory.newThread(new Runnable() { + @Override + public void run() { + } + }); + capturedGroup.set(t.getThreadGroup()); + } + }); + thread.start(); + thread.join(); + + assertEquals(group, capturedGroup.get()); + } } diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultThreadFactoryTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultThreadFactoryTest.java new file mode 100644 index 0000000000..05c677e06e --- /dev/null +++ b/common/src/test/java/io/netty/util/concurrent/DefaultThreadFactoryTest.java @@ -0,0 +1,266 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class DefaultThreadFactoryTest { + @Test(timeout = 2000) + public void testDescendantThreadGroups() throws InterruptedException { + final SecurityManager current = System.getSecurityManager(); + + try { + // install security manager that only allows parent thread groups to mess with descendant thread groups + System.setSecurityManager(new SecurityManager() { + @Override + public void checkAccess(ThreadGroup g) { + final ThreadGroup source = Thread.currentThread().getThreadGroup(); + + if (source != null) { + if (!source.parentOf(g)) { + throw new SecurityException("source group is not an ancestor of the target group"); + } + super.checkAccess(g); + } + } + + // so we can restore the security manager at the end of the test + @Override + public void checkPermission(Permission perm) { + } + }); + + // holder for the thread factory, plays the role of a global singleton + final AtomicReference factory = new AtomicReference(); + final AtomicInteger counter = new AtomicInteger(); + final Runnable task = new Runnable() { + @Override + public void run() { + counter.incrementAndGet(); + } + }; + + final AtomicReference interrupted = new AtomicReference(); + + // create the thread factory, since we are running the thread group brother, the thread + // factory will now forever be tied to that group + // we then create a thread from the factory to run a "task" for us + final Thread first = new Thread(new ThreadGroup("brother"), new Runnable() { + @Override + public void run() { + factory.set(new DefaultThreadFactory("test", false, Thread.NORM_PRIORITY, null)); + final Thread t = factory.get().newThread(task); + t.start(); + try { + t.join(); + } catch (InterruptedException e) { + interrupted.set(e); + Thread.currentThread().interrupt(); + } + } + }); + first.start(); + first.join(); + + assertNull(interrupted.get()); + + // now we will use factory again, this time from a sibling thread group sister + // if DefaultThreadFactory is "sticky" about thread groups, a security manager + // that forbids sibling thread groups from messing with each other will strike this down + final Thread second = new Thread(new ThreadGroup("sister"), new Runnable() { + @Override + public void run() { + final Thread t = factory.get().newThread(task); + t.start(); + try { + t.join(); + } catch (InterruptedException e) { + interrupted.set(e); + Thread.currentThread().interrupt(); + } + } + }); + second.start(); + second.join(); + + assertNull(interrupted.get()); + + assertEquals(2, counter.get()); + } finally { + System.setSecurityManager(current); + } + } + + // test that when DefaultThreadFactory is constructed with a sticky thread group, threads + // created by it have the sticky thread group + @Test(timeout = 2000) + public void testDefaultThreadFactoryStickyThreadGroupConstructor() throws InterruptedException { + final ThreadGroup sticky = new ThreadGroup("sticky"); + runStickyThreadGroupTest( + new Callable() { + @Override + public DefaultThreadFactory call() throws Exception { + return new DefaultThreadFactory("test", false, Thread.NORM_PRIORITY, sticky); + } + }, + sticky); + } + + // test that when DefaultThreadFactory is constructed it is sticky to the thread group from the thread group of the + // thread that created it + @Test(timeout = 2000) + public void testDefaulThreadFactoryInheritsThreadGroup() throws InterruptedException { + final ThreadGroup sticky = new ThreadGroup("sticky"); + + runStickyThreadGroupTest( + new Callable() { + @Override + public DefaultThreadFactory call() throws Exception { + final AtomicReference factory = + new AtomicReference(); + final Thread thread = new Thread(sticky, new Runnable() { + @Override + public void run() { + factory.set(new DefaultThreadFactory("test")); + } + }); + thread.start(); + thread.join(); + return factory.get(); + } + }, + sticky); + } + + // test that when a security manager is installed that provides a ThreadGroup, DefaultThreadFactory inherits from + // the security manager + @Test(timeout = 2000) + public void testDefaultThreadFactoryInheritsThreadGroupFromSecurityManager() throws InterruptedException { + final SecurityManager current = System.getSecurityManager(); + + try { + final ThreadGroup sticky = new ThreadGroup("sticky"); + System.setSecurityManager(new SecurityManager() { + @Override + public ThreadGroup getThreadGroup() { + return sticky; + } + + // so we can restore the security manager at the end of the test + @Override + public void checkPermission(Permission perm) { + } + }); + runStickyThreadGroupTest( + new Callable() { + @Override + public DefaultThreadFactory call() throws Exception { + return new DefaultThreadFactory("test"); + } + }, + sticky); + } finally { + System.setSecurityManager(current); + } + } + + private void runStickyThreadGroupTest( + final Callable callable, + final ThreadGroup expected) throws InterruptedException { + final AtomicReference captured = new AtomicReference(); + final AtomicReference exception = new AtomicReference(); + + final Thread first = new Thread(new ThreadGroup("wrong"), new Runnable() { + @Override + public void run() { + final DefaultThreadFactory factory; + try { + factory = callable.call(); + } catch (Exception e) { + exception.set(e); + throw new RuntimeException(e); + } + final Thread t = factory.newThread(new Runnable() { + @Override + public void run() { + } + }); + captured.set(t.getThreadGroup()); + } + }); + first.start(); + first.join(); + + assertNull(exception.get()); + + assertEquals(expected, captured.get()); + } + + // test that when DefaultThreadFactory is constructed without a sticky thread group, threads + // created by it inherit the correct thread group + @Test(timeout = 2000) + public void testDefaultThreadFactoryNonStickyThreadGroupConstructor() throws InterruptedException { + + final AtomicReference factory = new AtomicReference(); + final AtomicReference firstCaptured = new AtomicReference(); + + final ThreadGroup firstGroup = new ThreadGroup("first"); + final Thread first = new Thread(firstGroup, new Runnable() { + @Override + public void run() { + factory.set(new DefaultThreadFactory("sticky", false, Thread.NORM_PRIORITY, null)); + final Thread t = factory.get().newThread(new Runnable() { + @Override + public void run() { + } + }); + firstCaptured.set(t.getThreadGroup()); + } + }); + first.start(); + first.join(); + + assertEquals(firstGroup, firstCaptured.get()); + + final AtomicReference secondCaptured = new AtomicReference(); + + final ThreadGroup secondGroup = new ThreadGroup("second"); + final Thread second = new Thread(secondGroup, new Runnable() { + @Override + public void run() { + final Thread t = factory.get().newThread(new Runnable() { + @Override + public void run() { + } + }); + secondCaptured.set(t.getThreadGroup()); + } + }); + second.start(); + second.join(); + + assertEquals(secondGroup, secondCaptured.get()); + } +} diff --git a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java index ca37073301..9c9c3898b1 100644 --- a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java @@ -21,9 +21,14 @@ import org.junit.Test; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; public class GlobalEventExecutorTest { @@ -91,6 +96,29 @@ public class GlobalEventExecutorTest { assertThat(e.thread, sameInstance(thread)); } + // ensure that when a task submission causes a new thread to be created, the thread inherits the thread group of the + // submitting thread + @Test(timeout = 2000) + public void testThreadGroup() throws InterruptedException { + final ThreadGroup group = new ThreadGroup("group"); + final AtomicReference capturedGroup = new AtomicReference(); + final Thread thread = new Thread(group, new Runnable() { + @Override + public void run() { + final Thread t = e.threadFactory.newThread(new Runnable() { + @Override + public void run() { + } + }); + capturedGroup.set(t.getThreadGroup()); + } + }); + thread.start(); + thread.join(); + + assertEquals(group, capturedGroup.get()); + } + private static final class TestRunnable implements Runnable { final AtomicBoolean ran = new AtomicBoolean(); final long delay;