Introduce ThreadDeathWatcher
Motivation: PooledByteBufAllocator's thread local cache and ReferenceCountUtil.releaseLater() are in need of a way to run an arbitrary logic when a certain thread is terminated. Modifications: - Add ThreadDeathWatcher, which spawns a low-priority daemon thread that watches a list of threads periodically (every second) and invokes the specified tasks when the associated threads are not alive anymore - Start-stop logic based on CAS operation proposed by @tea-dragon - Add debug-level log messages to see if ThreadDeathWatcher works Result: - Fixes #2519 because we don't use GlobalEventExecutor anymore - Cleaner code
This commit is contained in:
parent
35cc23ee8b
commit
642f4bb3b1
@ -192,29 +192,32 @@ final class PoolThreadCache {
|
|||||||
/**
|
/**
|
||||||
* Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
|
* Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
|
||||||
*/
|
*/
|
||||||
void free() {
|
int free() {
|
||||||
free(tinySubPageDirectCaches);
|
return free(tinySubPageDirectCaches) +
|
||||||
free(smallSubPageDirectCaches);
|
free(smallSubPageDirectCaches) +
|
||||||
free(normalDirectCaches);
|
free(normalDirectCaches) +
|
||||||
free(tinySubPageHeapCaches);
|
free(tinySubPageHeapCaches) +
|
||||||
free(smallSubPageHeapCaches);
|
free(smallSubPageHeapCaches) +
|
||||||
free(normalHeapCaches);
|
free(normalHeapCaches);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void free(MemoryRegionCache<?>[] caches) {
|
private static int free(MemoryRegionCache<?>[] caches) {
|
||||||
if (caches == null) {
|
if (caches == null) {
|
||||||
return;
|
return 0;
|
||||||
}
|
|
||||||
for (int i = 0; i < caches.length; i++) {
|
|
||||||
free(caches[i]);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void free(MemoryRegionCache<?> cache) {
|
int numFreed = 0;
|
||||||
if (cache == null) {
|
for (int i = 0; i < caches.length; i++) {
|
||||||
return;
|
numFreed += free(caches[i]);
|
||||||
}
|
}
|
||||||
cache.free();
|
return numFreed;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int free(MemoryRegionCache<?> cache) {
|
||||||
|
if (cache == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return cache.free();
|
||||||
}
|
}
|
||||||
|
|
||||||
void trim() {
|
void trim() {
|
||||||
@ -384,13 +387,16 @@ final class PoolThreadCache {
|
|||||||
/**
|
/**
|
||||||
* Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
|
* Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
|
||||||
*/
|
*/
|
||||||
public void free() {
|
public int free() {
|
||||||
|
int numFreed = 0;
|
||||||
entriesInUse = 0;
|
entriesInUse = 0;
|
||||||
maxEntriesInUse = 0;
|
maxEntriesInUse = 0;
|
||||||
for (int i = head;; i = nextIdx(i)) {
|
for (int i = head;; i = nextIdx(i)) {
|
||||||
if (!freeEntry(entries[i])) {
|
if (freeEntry(entries[i])) {
|
||||||
|
numFreed++;
|
||||||
|
} else {
|
||||||
// all cleared
|
// all cleared
|
||||||
return;
|
return numFreed;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -440,7 +446,7 @@ final class PoolThreadCache {
|
|||||||
|
|
||||||
private int nextIdx(int index) {
|
private int nextIdx(int index) {
|
||||||
// use bitwise operation as this is faster as using modulo.
|
// use bitwise operation as this is faster as using modulo.
|
||||||
return (index + 1) & entries.length - 1;
|
return index + 1 & entries.length - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class Entry<T> {
|
private static final class Entry<T> {
|
||||||
|
@ -16,18 +16,13 @@
|
|||||||
|
|
||||||
package io.netty.buffer;
|
package io.netty.buffer;
|
||||||
|
|
||||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
import io.netty.util.ThreadDeathWatcher;
|
||||||
import io.netty.util.concurrent.ScheduledFuture;
|
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import io.netty.util.internal.SystemPropertyUtil;
|
import io.netty.util.internal.SystemPropertyUtil;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.IdentityHashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
||||||
@ -44,7 +39,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
|||||||
private static final int DEFAULT_NORMAL_CACHE_SIZE;
|
private static final int DEFAULT_NORMAL_CACHE_SIZE;
|
||||||
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
|
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
|
||||||
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
|
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
|
||||||
private static final long DEFAULT_CACHE_CLEANUP_INTERVAL;
|
|
||||||
|
|
||||||
private static final int MIN_PAGE_SIZE = 4096;
|
private static final int MIN_PAGE_SIZE = 4096;
|
||||||
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
|
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
|
||||||
@ -101,9 +95,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
|||||||
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
|
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
|
||||||
"io.netty.allocator.cacheTrimInterval", 8192);
|
"io.netty.allocator.cacheTrimInterval", 8192);
|
||||||
|
|
||||||
// the default interval at which we check for caches that are assigned to Threads that are not alive anymore
|
|
||||||
DEFAULT_CACHE_CLEANUP_INTERVAL = SystemPropertyUtil.getLong(
|
|
||||||
"io.netty.allocator.cacheCleanupInterval", 5000);
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
|
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
|
||||||
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
|
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
|
||||||
@ -122,10 +113,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
|||||||
logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
|
logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
|
||||||
logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
|
logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
|
||||||
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
|
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
|
||||||
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}",
|
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
|
||||||
DEFAULT_CACHE_TRIM_INTERVAL);
|
|
||||||
logger.debug("-Dio.netty.allocator.cacheCleanupInterval: {} ms",
|
|
||||||
DEFAULT_CACHE_CLEANUP_INTERVAL);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,15 +147,8 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
|||||||
|
|
||||||
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
|
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
|
||||||
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
|
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
|
||||||
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize,
|
|
||||||
normalCacheSize, DEFAULT_CACHE_CLEANUP_INTERVAL);
|
|
||||||
}
|
|
||||||
|
|
||||||
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
|
|
||||||
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
|
|
||||||
long cacheThreadAliveCheckInterval) {
|
|
||||||
super(preferDirect);
|
super(preferDirect);
|
||||||
threadCache = new PoolThreadLocalCache(cacheThreadAliveCheckInterval);
|
threadCache = new PoolThreadLocalCache();
|
||||||
this.tinyCacheSize = tinyCacheSize;
|
this.tinyCacheSize = tinyCacheSize;
|
||||||
this.smallCacheSize = smallCacheSize;
|
this.smallCacheSize = smallCacheSize;
|
||||||
this.normalCacheSize = normalCacheSize;
|
this.normalCacheSize = normalCacheSize;
|
||||||
@ -276,19 +257,10 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
final class PoolThreadLocalCache extends ThreadLocal<PoolThreadCache> {
|
final class PoolThreadLocalCache extends ThreadLocal<PoolThreadCache> {
|
||||||
private final Map<Thread, PoolThreadCache> caches = new IdentityHashMap<Thread, PoolThreadCache>();
|
|
||||||
private final ReleaseCacheTask task = new ReleaseCacheTask();
|
|
||||||
private final AtomicInteger index = new AtomicInteger();
|
private final AtomicInteger index = new AtomicInteger();
|
||||||
private final long cacheThreadAliveCheckInterval;
|
|
||||||
|
|
||||||
PoolThreadLocalCache(long cacheThreadAliveCheckInterval) {
|
|
||||||
this.cacheThreadAliveCheckInterval = cacheThreadAliveCheckInterval;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PoolThreadCache get() {
|
protected PoolThreadCache initialValue() {
|
||||||
PoolThreadCache cache = super.get();
|
|
||||||
if (cache == null) {
|
|
||||||
final int idx = index.getAndIncrement();
|
final int idx = index.getAndIncrement();
|
||||||
final PoolArena<byte[]> heapArena;
|
final PoolArena<byte[]> heapArena;
|
||||||
final PoolArena<ByteBuffer> directArena;
|
final PoolArena<ByteBuffer> directArena;
|
||||||
@ -304,68 +276,25 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
|
|||||||
} else {
|
} else {
|
||||||
directArena = null;
|
directArena = null;
|
||||||
}
|
}
|
||||||
// If the current Thread is assigned to an EventExecutor we can
|
|
||||||
// easily free the cached stuff again once the EventExecutor completes later.
|
final PoolThreadCache cache = new PoolThreadCache(
|
||||||
cache = new PoolThreadCache(
|
|
||||||
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
|
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
|
||||||
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
|
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
|
||||||
set(cache);
|
|
||||||
}
|
|
||||||
return cache;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void set(PoolThreadCache value) {
|
|
||||||
Thread current = Thread.currentThread();
|
|
||||||
synchronized (caches) {
|
|
||||||
caches.put(current, value);
|
|
||||||
if (task.releaseTaskFuture == null) {
|
|
||||||
task.releaseTaskFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(task,
|
|
||||||
cacheThreadAliveCheckInterval, cacheThreadAliveCheckInterval, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
super.set(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void remove() {
|
|
||||||
super.remove();
|
|
||||||
PoolThreadCache cache;
|
|
||||||
Thread current = Thread.currentThread();
|
|
||||||
synchronized (caches) {
|
|
||||||
cache = caches.remove(current);
|
|
||||||
}
|
|
||||||
if (cache != null) {
|
|
||||||
cache.free();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final class ReleaseCacheTask implements Runnable {
|
|
||||||
private ScheduledFuture<?> releaseTaskFuture;
|
|
||||||
|
|
||||||
|
// The thread-local cache will keep a list of pooled buffers which must be returned to
|
||||||
|
// the pool when the thread is not alive anymore.
|
||||||
|
final Thread thread = Thread.currentThread();
|
||||||
|
ThreadDeathWatcher.watch(thread, new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
synchronized (caches) {
|
int numFreed = cache.free();
|
||||||
for (Iterator<Map.Entry<Thread, PoolThreadCache>> i = caches.entrySet().iterator();
|
if (logger.isDebugEnabled()) {
|
||||||
i.hasNext();) {
|
logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, thread.getName());
|
||||||
Map.Entry<Thread, PoolThreadCache> cache = i.next();
|
|
||||||
if (cache.getKey().isAlive()) {
|
|
||||||
// Thread is still alive...
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
cache.getValue().free();
|
|
||||||
i.remove();
|
|
||||||
}
|
|
||||||
if (caches.isEmpty()) {
|
|
||||||
// Nothing in the caches anymore so no need to continue to check if something needs to be
|
|
||||||
// released periodically. The task will be rescheduled if there is any need later.
|
|
||||||
if (releaseTaskFuture != null) {
|
|
||||||
releaseTaskFuture.cancel(true);
|
|
||||||
releaseTaskFuture = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return cache;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,19 +15,10 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.util;
|
package io.netty.util;
|
||||||
|
|
||||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
|
||||||
import io.netty.util.concurrent.ScheduledFuture;
|
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.IdentityHashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Collection of method to handle objects that may implement {@link ReferenceCounted}.
|
* Collection of method to handle objects that may implement {@link ReferenceCounted}.
|
||||||
*/
|
*/
|
||||||
@ -106,8 +97,6 @@ public final class ReferenceCountUtil {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Map<Thread, List<Entry>> pendingReleases = new IdentityHashMap<Thread, List<Entry>>();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedules the specified object to be released when the caller thread terminates. Note that this operation is
|
* Schedules the specified object to be released when the caller thread terminates. Note that this operation is
|
||||||
* intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the
|
* intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the
|
||||||
@ -124,82 +113,40 @@ public final class ReferenceCountUtil {
|
|||||||
*/
|
*/
|
||||||
public static <T> T releaseLater(T msg, int decrement) {
|
public static <T> T releaseLater(T msg, int decrement) {
|
||||||
if (msg instanceof ReferenceCounted) {
|
if (msg instanceof ReferenceCounted) {
|
||||||
synchronized (pendingReleases) {
|
ThreadDeathWatcher.watch(Thread.currentThread(), new ReleasingTask((ReferenceCounted) msg, decrement));
|
||||||
Thread thread = Thread.currentThread();
|
|
||||||
List<Entry> entries = pendingReleases.get(thread);
|
|
||||||
if (entries == null) {
|
|
||||||
// Start the periodic releasing task (if not started yet.)
|
|
||||||
if (pendingReleases.isEmpty()) {
|
|
||||||
ReleasingTask task = new ReleasingTask();
|
|
||||||
task.future = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(task, 1, 1, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new entry.
|
|
||||||
entries = new ArrayList<Entry>();
|
|
||||||
pendingReleases.put(thread, entries);
|
|
||||||
}
|
|
||||||
|
|
||||||
entries.add(new Entry((ReferenceCounted) msg, decrement));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class Entry {
|
|
||||||
final ReferenceCounted obj;
|
|
||||||
final int decrement;
|
|
||||||
|
|
||||||
Entry(ReferenceCounted obj, int decrement) {
|
|
||||||
this.obj = obj;
|
|
||||||
this.decrement = decrement;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
return StringUtil.simpleClassName(obj) + ".release(" + decrement + ") refCnt: " + obj.refCnt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Releases the objects when the thread that called {@link #releaseLater(Object)} has been terminated.
|
* Releases the objects when the thread that called {@link #releaseLater(Object)} has been terminated.
|
||||||
*/
|
*/
|
||||||
private static final class ReleasingTask implements Runnable {
|
private static final class ReleasingTask implements Runnable {
|
||||||
volatile ScheduledFuture<?> future;
|
|
||||||
|
private final ReferenceCounted obj;
|
||||||
|
private final int decrement;
|
||||||
|
|
||||||
|
ReleasingTask(ReferenceCounted obj, int decrement) {
|
||||||
|
this.obj = obj;
|
||||||
|
this.decrement = decrement;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
synchronized (pendingReleases) {
|
|
||||||
for (Iterator<Map.Entry<Thread, List<Entry>>> i = pendingReleases.entrySet().iterator();
|
|
||||||
i.hasNext();) {
|
|
||||||
|
|
||||||
Map.Entry<Thread, List<Entry>> e = i.next();
|
|
||||||
if (e.getKey().isAlive()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
releaseAll(e.getValue());
|
|
||||||
|
|
||||||
// Remove from the map since the thread is not alive anymore.
|
|
||||||
i.remove();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pendingReleases.isEmpty()) {
|
|
||||||
future.cancel(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void releaseAll(Iterable<Entry> entries) {
|
|
||||||
for (Entry e: entries) {
|
|
||||||
try {
|
try {
|
||||||
if (!e.obj.release(e.decrement)) {
|
if (!obj.release(decrement)) {
|
||||||
logger.warn("Non-zero refCnt: {}", e);
|
logger.warn("Non-zero refCnt: {}", this);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Released: {}", e);
|
logger.debug("Released: {}", this);
|
||||||
}
|
}
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
logger.warn("Failed to release an object: {}", e.obj, ex);
|
logger.warn("Failed to release an object: {}", obj, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return StringUtil.simpleClassName(obj) + ".release(" + decrement + ") refCnt: " + obj.refCnt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
168
common/src/main/java/io/netty/util/ThreadDeathWatcher.java
Normal file
168
common/src/main/java/io/netty/util/ThreadDeathWatcher.java
Normal file
@ -0,0 +1,168 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2014 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.util;
|
||||||
|
|
||||||
|
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||||
|
import io.netty.util.internal.MpscLinkedQueue;
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if a thread is alive periodically and runs a task when a thread dies.
|
||||||
|
* <p>
|
||||||
|
* This thread starts a daemon thread to check the state of the threads being watched and to invoke their
|
||||||
|
* associated {@link Runnable}s. When there is no thread to watch (i.e. all threads are dead), the daemon thread
|
||||||
|
* will terminate itself, and a new daemon thread will be started again when a new watch is added.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
public final class ThreadDeathWatcher {
|
||||||
|
|
||||||
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);
|
||||||
|
private static final ThreadFactory threadFactory =
|
||||||
|
new DefaultThreadFactory(ThreadDeathWatcher.class, true, Thread.MIN_PRIORITY);
|
||||||
|
|
||||||
|
private static final Queue<Entry> pendingEntries = PlatformDependent.newMpscQueue();
|
||||||
|
private static final Watcher watcher = new Watcher();
|
||||||
|
private static final AtomicBoolean started = new AtomicBoolean();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedules the specified {@code task} to run when the specified {@code thread} dies.
|
||||||
|
*
|
||||||
|
* @param thread the {@link Thread} to watch
|
||||||
|
* @param task the {@link Runnable} to run when the {@code thread} dies
|
||||||
|
*
|
||||||
|
* @throws IllegalArgumentException if the specified {@code thread} is not alive
|
||||||
|
*/
|
||||||
|
public static void watch(Thread thread, Runnable task) {
|
||||||
|
if (thread == null) {
|
||||||
|
throw new NullPointerException("thread");
|
||||||
|
}
|
||||||
|
if (task == null) {
|
||||||
|
throw new NullPointerException("task");
|
||||||
|
}
|
||||||
|
if (!thread.isAlive()) {
|
||||||
|
throw new IllegalArgumentException("thread must be alive.");
|
||||||
|
}
|
||||||
|
|
||||||
|
pendingEntries.add(new Entry(thread, task));
|
||||||
|
|
||||||
|
if (started.compareAndSet(false, true)) {
|
||||||
|
Thread watcherThread = threadFactory.newThread(watcher);
|
||||||
|
watcherThread.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ThreadDeathWatcher() { }
|
||||||
|
|
||||||
|
private static final class Watcher implements Runnable {
|
||||||
|
|
||||||
|
private final List<Entry> watchees = new ArrayList<Entry>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (;;) {
|
||||||
|
fetchWatchees();
|
||||||
|
notifyWatchees();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException ignore) {
|
||||||
|
// Ignore the interrupt; do not terminate until all tasks are run.
|
||||||
|
}
|
||||||
|
|
||||||
|
if (watchees.isEmpty() && pendingEntries.isEmpty()) {
|
||||||
|
|
||||||
|
// Mark the current worker thread as stopped.
|
||||||
|
// The following CAS must always success and must be uncontended,
|
||||||
|
// because only one watcher thread should be running at the same time.
|
||||||
|
boolean stopped = started.compareAndSet(true, false);
|
||||||
|
assert stopped;
|
||||||
|
|
||||||
|
// Check if there are pending entries added by watch() while we do CAS above.
|
||||||
|
if (pendingEntries.isEmpty()) {
|
||||||
|
// A) watch() was not invoked and thus there's nothing to handle
|
||||||
|
// -> safe to terminate because there's nothing left to do
|
||||||
|
// B) a new watcher thread started and handled them all
|
||||||
|
// -> safe to terminate the new watcher thread will take care the rest
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// There are pending entries again, added by watch()
|
||||||
|
if (!started.compareAndSet(false, true)) {
|
||||||
|
// watch() started a new watcher thread and set 'started' to true.
|
||||||
|
// -> terminate this thread so that the new watcher reads from pendingEntries exclusively.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// watch() added an entry, but this worker was faster to set 'started' to true.
|
||||||
|
// i.e. a new watcher thread was not started
|
||||||
|
// -> keep this thread alive to handle the newly added entries.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fetchWatchees() {
|
||||||
|
for (;;) {
|
||||||
|
Entry e = pendingEntries.poll();
|
||||||
|
if (e == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
watchees.add(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void notifyWatchees() {
|
||||||
|
List<Entry> watchees = this.watchees;
|
||||||
|
for (int i = 0; i < watchees.size();) {
|
||||||
|
Entry e = watchees.get(i);
|
||||||
|
if (!e.thread.isAlive()) {
|
||||||
|
watchees.remove(i);
|
||||||
|
try {
|
||||||
|
e.task.run();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("Thread death watcher task raised an exception:", t);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
i ++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class Entry extends MpscLinkedQueue.Node<Entry> {
|
||||||
|
final Thread thread;
|
||||||
|
final Runnable task;
|
||||||
|
|
||||||
|
Entry(Thread thread, Runnable task) {
|
||||||
|
this.thread = thread;
|
||||||
|
this.task = task;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Entry value() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,73 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2014 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.netty.util;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.*;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class ThreadDeathWatcherTest {
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testSimple() throws Exception {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
final Thread t = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (;;) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException ignore) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
final Runnable task = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (!t.isAlive()) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
ThreadDeathWatcher.watch(t, task);
|
||||||
|
fail("must reject to watch a non-alive thread.");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
|
||||||
|
t.start();
|
||||||
|
ThreadDeathWatcher.watch(t, task);
|
||||||
|
|
||||||
|
// As long as the thread is alive, the task should not run.
|
||||||
|
assertThat(latch.await(750, TimeUnit.MILLISECONDS), is(false));
|
||||||
|
|
||||||
|
// Interrupt the thread to terminate it.
|
||||||
|
t.interrupt();
|
||||||
|
|
||||||
|
// The task must be run on termination.
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user