Allow to get details of the Thread that powers a SingleThreadEventExecutor.

Motivation:

for debugging and metrics reasons its sometimes useful to be able to get details of the the Thread that powers a SingleThreadEventExecutor.

Modifications:

- Expose ThreadProperties
- Add unit test.

Result:

It's now possible to get details of the Thread that powers a SingleThreadEventExecutor.
This commit is contained in:
Norman Maurer 2015-08-25 12:20:23 +02:00
parent 1a9ea2d349
commit 6d473e7f39
3 changed files with 207 additions and 0 deletions

View File

@ -19,6 +19,7 @@ import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
@ -32,6 +33,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
@ -54,8 +56,15 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
// Do nothing.
}
};
private static final Runnable NOOP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER;
static {
AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater =
@ -64,11 +73,20 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
}
STATE_UPDATER = updater;
AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> propertiesUpdater =
PlatformDependent.newAtomicReferenceFieldUpdater(SingleThreadEventExecutor.class, "threadProperties");
if (propertiesUpdater == null) {
propertiesUpdater = AtomicReferenceFieldUpdater.newUpdater(SingleThreadEventExecutor.class,
ThreadProperties.class, "threadProperties");
}
PROPERTIES_UPDATER = propertiesUpdater;
}
private final Queue<Runnable> taskQueue;
private volatile Thread thread;
private volatile ThreadProperties threadProperties;
private final Executor executor;
private volatile boolean interrupted;
@ -662,6 +680,31 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
}
}
/**
* Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
* If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until the
* it is fully started.
*/
public final ThreadProperties threadProperties() {
ThreadProperties threadProperties = this.threadProperties;
if (threadProperties == null) {
Thread thread = this.thread;
if (thread == null) {
assert !inEventLoop();
submit(NOOP_TASK).syncUninterruptibly();
thread = this.thread;
assert thread != null;
}
threadProperties = new DefaultThreadProperties(thread);
if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
threadProperties = this.threadProperties;
}
}
return threadProperties;
}
@SuppressWarnings("unused")
protected boolean wakesUpForTask(Runnable task) {
return true;
@ -742,4 +785,52 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
}
});
}
private static final class DefaultThreadProperties implements ThreadProperties {
private final Thread t;
DefaultThreadProperties(Thread t) {
this.t = t;
}
@Override
public State state() {
return t.getState();
}
@Override
public int priority() {
return t.getPriority();
}
@Override
public boolean isInterrupted() {
return t.isInterrupted();
}
@Override
public boolean isDaemon() {
return t.isDaemon();
}
@Override
public String name() {
return t.getName();
}
@Override
public long id() {
return t.getId();
}
@Override
public StackTraceElement[] stackTrace() {
return t.getStackTrace();
}
@Override
public boolean isAlive() {
return t.isAlive();
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
/**
* Expose details for a {@link Thread}.
*/
public interface ThreadProperties {
/**
* @see {@link Thread#getState()}.
*/
Thread.State state();
/**
* @see {@link Thread#getPriority()}.
*/
int priority();
/**
* @see {@link Thread#isInterrupted()}.
*/
boolean isInterrupted();
/**
* @see {@link Thread#isDaemon()} ()}.
*/
boolean isDaemon();
/**
* @see {@link Thread#getName()} ()}.
*/
String name();
/**
* @see {@link Thread#getId()}.
*/
long id();
/**
* @see {@link Thread#getStackTrace()}.
*/
StackTraceElement[] stackTrace();
/**
* @see {@link Thread#isAlive()}.
*/
boolean isAlive();
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicReference;
public class SingleThreadEventExecutorTest {
@Test
public void testThreadDetails() {
final AtomicReference<Thread> threadRef = new AtomicReference<Thread>();
SingleThreadEventExecutor executor = new SingleThreadEventExecutor(
null, new DefaultThreadFactory("test"), false) {
@Override
protected void run() {
threadRef.set(Thread.currentThread());
while (!confirmShutdown()) {
Runnable task = takeTask();
if (task != null) {
task.run();
}
}
}
};
ThreadProperties threadProperties = executor.threadProperties();
Assert.assertSame(threadProperties, executor.threadProperties());
Thread thread = threadRef.get();
Assert.assertEquals(thread.getId(), threadProperties.id());
Assert.assertEquals(thread.getName(), threadProperties.name());
Assert.assertEquals(thread.getPriority(), threadProperties.priority());
Assert.assertEquals(thread.getState(), threadProperties.state());
Assert.assertEquals(thread.isAlive(), threadProperties.isAlive());
Assert.assertEquals(thread.isDaemon(), threadProperties.isDaemon());
Assert.assertEquals(thread.isInterrupted(), threadProperties.isInterrupted());
Assert.assertTrue(threadProperties.stackTrace().length > 0);
executor.shutdownGracefully();
}
}