Non-sticky thread groups in DefaultThreadFactory
Motivation: A recent change to DefaultThreadFactory modified it so that it is sticky with respect to thread groups. In particular, this change made it so that DefaultThreadFactory would hold on to the thread group that created it, and then use that thread group to create threads. This can have problematic semantics since it can lead to violations of a tenet of thread groups that a thread can only modify threads in its own thread group and descendant thread groups. With a sticky thread group, a thread triggering the creation of a new thread via DefaultThreadFactory#newThread will be modifying a thread from the sticky thread group which will not necessarily be its own nor a descendant thread group. When a security manager is in place that enforces this requirement, these modifications are now impossible. This is especially problematic in the context of Netty because certain global singletons like GlobalEventExecutor will create a DefaultThreadFactory. If all DefaultThreadFactory instances are sticky about their thread groups, it means that submitting tasks to the GlobalEventExecutor singleton can cause a thread to be created from the DefaultThreadFactory sticky thread group, exactly the problem with DefaultThreadFactory being sticky about thread groups. A similar problem arises from the ThreadDeathWatcher. Modifications: This commit modifies DefaultThreadFactory so that a null thread group can be set with the behavior that all threads created by such an instance will inherit the default thread group (the thread group provided by the security manager if there is one, otherwise the thread group of the creating thread). The construction of the instances of DefaultThreadFactory used by the GlobalEventExecutor singleton and ThreadDeathWatcher are modified to use this behavior. Additionally, we also modify the chained constructor invocations of the DefaultThreadFactory that do not have a parameter to specify a thread group to use the thread group from the security manager is available, otherwise the creating thread's thread group. We also add unit tests ensuring that all of this behavior is maintained. Result: It will be possible to have DefaultThreadFactory instances that are not sticky about the thread group that led to their creation. Instead, threads created by such a DefaultThreadFactory will inherit the default thread group which will either be the thread group from the security manager or the current thread's thread group.
This commit is contained in:
parent
c221d32b92
commit
27520f5208
@ -41,7 +41,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
public final class ThreadDeathWatcher {
|
||||
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);
|
||||
private static final ThreadFactory threadFactory;
|
||||
// visible for testing
|
||||
static final ThreadFactory threadFactory;
|
||||
|
||||
private static final Queue<Entry> pendingEntries = PlatformDependent.newMpscQueue();
|
||||
private static final Watcher watcher = new Watcher();
|
||||
@ -54,7 +55,10 @@ public final class ThreadDeathWatcher {
|
||||
if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) {
|
||||
poolName = serviceThreadPrefix + poolName;
|
||||
}
|
||||
threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY);
|
||||
// because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and
|
||||
// this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory
|
||||
// must not be sticky about its thread group
|
||||
threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -16,7 +16,6 @@
|
||||
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
import java.util.Locale;
|
||||
@ -64,7 +63,7 @@ public class DefaultThreadFactory implements ThreadFactory {
|
||||
this(toPoolName(poolType), daemon, priority);
|
||||
}
|
||||
|
||||
private static String toPoolName(Class<?> poolType) {
|
||||
public static String toPoolName(Class<?> poolType) {
|
||||
if (poolType == null) {
|
||||
throw new NullPointerException("poolType");
|
||||
}
|
||||
@ -96,11 +95,12 @@ public class DefaultThreadFactory implements ThreadFactory {
|
||||
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
|
||||
this.daemon = daemon;
|
||||
this.priority = priority;
|
||||
this.threadGroup = ObjectUtil.checkNotNull(threadGroup, "threadGroup");
|
||||
this.threadGroup = threadGroup;
|
||||
}
|
||||
|
||||
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
|
||||
this(poolName, daemon, priority, Thread.currentThread().getThreadGroup());
|
||||
this(poolName, daemon, priority, System.getSecurityManager() == null ?
|
||||
Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -126,7 +126,6 @@ public class DefaultThreadFactory implements ThreadFactory {
|
||||
return t;
|
||||
}
|
||||
|
||||
// TODO: Once we can break the API we should add ThreadGroup to the arguments of this method.
|
||||
protected Thread newThread(Runnable r, String name) {
|
||||
return new FastThreadLocalThread(threadGroup, r, name);
|
||||
}
|
||||
|
@ -49,7 +49,12 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
|
||||
}
|
||||
}, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
|
||||
|
||||
private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
|
||||
// because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
|
||||
// can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
|
||||
// be sticky about its thread group
|
||||
// visible for testing
|
||||
final ThreadFactory threadFactory =
|
||||
new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null);
|
||||
private final TaskRunner taskRunner = new TaskRunner();
|
||||
private final AtomicBoolean started = new AtomicBoolean();
|
||||
volatile Thread thread;
|
||||
|
@ -21,9 +21,12 @@ import org.junit.Test;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class ThreadDeathWatcherTest {
|
||||
|
||||
@ -113,4 +116,25 @@ public class ThreadDeathWatcherTest {
|
||||
// And the task should not run.
|
||||
assertThat(run.get(), is(false));
|
||||
}
|
||||
|
||||
@Test(timeout = 2000)
|
||||
public void testThreadGroup() throws InterruptedException {
|
||||
final ThreadGroup group = new ThreadGroup("group");
|
||||
final AtomicReference<ThreadGroup> capturedGroup = new AtomicReference<ThreadGroup>();
|
||||
final Thread thread = new Thread(group, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
final Thread t = ThreadDeathWatcher.threadFactory.newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
}
|
||||
});
|
||||
capturedGroup.set(t.getThreadGroup());
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
thread.join();
|
||||
|
||||
assertEquals(group, capturedGroup.get());
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,266 @@
|
||||
/*
|
||||
* Copyright 2016 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.security.Permission;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
public class DefaultThreadFactoryTest {
|
||||
@Test(timeout = 2000)
|
||||
public void testDescendantThreadGroups() throws InterruptedException {
|
||||
final SecurityManager current = System.getSecurityManager();
|
||||
|
||||
try {
|
||||
// install security manager that only allows parent thread groups to mess with descendant thread groups
|
||||
System.setSecurityManager(new SecurityManager() {
|
||||
@Override
|
||||
public void checkAccess(ThreadGroup g) {
|
||||
final ThreadGroup source = Thread.currentThread().getThreadGroup();
|
||||
|
||||
if (source != null) {
|
||||
if (!source.parentOf(g)) {
|
||||
throw new SecurityException("source group is not an ancestor of the target group");
|
||||
}
|
||||
super.checkAccess(g);
|
||||
}
|
||||
}
|
||||
|
||||
// so we can restore the security manager at the end of the test
|
||||
@Override
|
||||
public void checkPermission(Permission perm) {
|
||||
}
|
||||
});
|
||||
|
||||
// holder for the thread factory, plays the role of a global singleton
|
||||
final AtomicReference<DefaultThreadFactory> factory = new AtomicReference<DefaultThreadFactory>();
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
final Runnable task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
||||
final AtomicReference<Throwable> interrupted = new AtomicReference<Throwable>();
|
||||
|
||||
// create the thread factory, since we are running the thread group brother, the thread
|
||||
// factory will now forever be tied to that group
|
||||
// we then create a thread from the factory to run a "task" for us
|
||||
final Thread first = new Thread(new ThreadGroup("brother"), new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
factory.set(new DefaultThreadFactory("test", false, Thread.NORM_PRIORITY, null));
|
||||
final Thread t = factory.get().newThread(task);
|
||||
t.start();
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
interrupted.set(e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
});
|
||||
first.start();
|
||||
first.join();
|
||||
|
||||
assertNull(interrupted.get());
|
||||
|
||||
// now we will use factory again, this time from a sibling thread group sister
|
||||
// if DefaultThreadFactory is "sticky" about thread groups, a security manager
|
||||
// that forbids sibling thread groups from messing with each other will strike this down
|
||||
final Thread second = new Thread(new ThreadGroup("sister"), new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
final Thread t = factory.get().newThread(task);
|
||||
t.start();
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
interrupted.set(e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
});
|
||||
second.start();
|
||||
second.join();
|
||||
|
||||
assertNull(interrupted.get());
|
||||
|
||||
assertEquals(2, counter.get());
|
||||
} finally {
|
||||
System.setSecurityManager(current);
|
||||
}
|
||||
}
|
||||
|
||||
// test that when DefaultThreadFactory is constructed with a sticky thread group, threads
|
||||
// created by it have the sticky thread group
|
||||
@Test(timeout = 2000)
|
||||
public void testDefaultThreadFactoryStickyThreadGroupConstructor() throws InterruptedException {
|
||||
final ThreadGroup sticky = new ThreadGroup("sticky");
|
||||
runStickyThreadGroupTest(
|
||||
new Callable<DefaultThreadFactory>() {
|
||||
@Override
|
||||
public DefaultThreadFactory call() throws Exception {
|
||||
return new DefaultThreadFactory("test", false, Thread.NORM_PRIORITY, sticky);
|
||||
}
|
||||
},
|
||||
sticky);
|
||||
}
|
||||
|
||||
// test that when DefaultThreadFactory is constructed it is sticky to the thread group from the thread group of the
|
||||
// thread that created it
|
||||
@Test(timeout = 2000)
|
||||
public void testDefaulThreadFactoryInheritsThreadGroup() throws InterruptedException {
|
||||
final ThreadGroup sticky = new ThreadGroup("sticky");
|
||||
|
||||
runStickyThreadGroupTest(
|
||||
new Callable<DefaultThreadFactory>() {
|
||||
@Override
|
||||
public DefaultThreadFactory call() throws Exception {
|
||||
final AtomicReference<DefaultThreadFactory> factory =
|
||||
new AtomicReference<DefaultThreadFactory>();
|
||||
final Thread thread = new Thread(sticky, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
factory.set(new DefaultThreadFactory("test"));
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
thread.join();
|
||||
return factory.get();
|
||||
}
|
||||
},
|
||||
sticky);
|
||||
}
|
||||
|
||||
// test that when a security manager is installed that provides a ThreadGroup, DefaultThreadFactory inherits from
|
||||
// the security manager
|
||||
@Test(timeout = 2000)
|
||||
public void testDefaultThreadFactoryInheritsThreadGroupFromSecurityManager() throws InterruptedException {
|
||||
final SecurityManager current = System.getSecurityManager();
|
||||
|
||||
try {
|
||||
final ThreadGroup sticky = new ThreadGroup("sticky");
|
||||
System.setSecurityManager(new SecurityManager() {
|
||||
@Override
|
||||
public ThreadGroup getThreadGroup() {
|
||||
return sticky;
|
||||
}
|
||||
|
||||
// so we can restore the security manager at the end of the test
|
||||
@Override
|
||||
public void checkPermission(Permission perm) {
|
||||
}
|
||||
});
|
||||
runStickyThreadGroupTest(
|
||||
new Callable<DefaultThreadFactory>() {
|
||||
@Override
|
||||
public DefaultThreadFactory call() throws Exception {
|
||||
return new DefaultThreadFactory("test");
|
||||
}
|
||||
},
|
||||
sticky);
|
||||
} finally {
|
||||
System.setSecurityManager(current);
|
||||
}
|
||||
}
|
||||
|
||||
private void runStickyThreadGroupTest(
|
||||
final Callable<DefaultThreadFactory> callable,
|
||||
final ThreadGroup expected) throws InterruptedException {
|
||||
final AtomicReference<ThreadGroup> captured = new AtomicReference<ThreadGroup>();
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
|
||||
final Thread first = new Thread(new ThreadGroup("wrong"), new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
final DefaultThreadFactory factory;
|
||||
try {
|
||||
factory = callable.call();
|
||||
} catch (Exception e) {
|
||||
exception.set(e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
final Thread t = factory.newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
}
|
||||
});
|
||||
captured.set(t.getThreadGroup());
|
||||
}
|
||||
});
|
||||
first.start();
|
||||
first.join();
|
||||
|
||||
assertNull(exception.get());
|
||||
|
||||
assertEquals(expected, captured.get());
|
||||
}
|
||||
|
||||
// test that when DefaultThreadFactory is constructed without a sticky thread group, threads
|
||||
// created by it inherit the correct thread group
|
||||
@Test(timeout = 2000)
|
||||
public void testDefaultThreadFactoryNonStickyThreadGroupConstructor() throws InterruptedException {
|
||||
|
||||
final AtomicReference<DefaultThreadFactory> factory = new AtomicReference<DefaultThreadFactory>();
|
||||
final AtomicReference<ThreadGroup> firstCaptured = new AtomicReference<ThreadGroup>();
|
||||
|
||||
final ThreadGroup firstGroup = new ThreadGroup("first");
|
||||
final Thread first = new Thread(firstGroup, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
factory.set(new DefaultThreadFactory("sticky", false, Thread.NORM_PRIORITY, null));
|
||||
final Thread t = factory.get().newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
}
|
||||
});
|
||||
firstCaptured.set(t.getThreadGroup());
|
||||
}
|
||||
});
|
||||
first.start();
|
||||
first.join();
|
||||
|
||||
assertEquals(firstGroup, firstCaptured.get());
|
||||
|
||||
final AtomicReference<ThreadGroup> secondCaptured = new AtomicReference<ThreadGroup>();
|
||||
|
||||
final ThreadGroup secondGroup = new ThreadGroup("second");
|
||||
final Thread second = new Thread(secondGroup, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
final Thread t = factory.get().newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
}
|
||||
});
|
||||
secondCaptured.set(t.getThreadGroup());
|
||||
}
|
||||
});
|
||||
second.start();
|
||||
second.join();
|
||||
|
||||
assertEquals(secondGroup, secondCaptured.get());
|
||||
}
|
||||
}
|
@ -21,9 +21,14 @@ import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.CoreMatchers.sameInstance;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class GlobalEventExecutorTest {
|
||||
|
||||
@ -91,6 +96,29 @@ public class GlobalEventExecutorTest {
|
||||
assertThat(e.thread, sameInstance(thread));
|
||||
}
|
||||
|
||||
// ensure that when a task submission causes a new thread to be created, the thread inherits the thread group of the
|
||||
// submitting thread
|
||||
@Test(timeout = 2000)
|
||||
public void testThreadGroup() throws InterruptedException {
|
||||
final ThreadGroup group = new ThreadGroup("group");
|
||||
final AtomicReference<ThreadGroup> capturedGroup = new AtomicReference<ThreadGroup>();
|
||||
final Thread thread = new Thread(group, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
final Thread t = e.threadFactory.newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
}
|
||||
});
|
||||
capturedGroup.set(t.getThreadGroup());
|
||||
}
|
||||
});
|
||||
thread.start();
|
||||
thread.join();
|
||||
|
||||
assertEquals(group, capturedGroup.get());
|
||||
}
|
||||
|
||||
private static final class TestRunnable implements Runnable {
|
||||
final AtomicBoolean ran = new AtomicBoolean();
|
||||
final long delay;
|
||||
|
Loading…
Reference in New Issue
Block a user