Fix a class loader leak in ForkJoinPool

Motivation:

As reported in #4211, when using Netty in Tomcat (or other container based deployment), ForkJoinPool leaks an instance of `Submitter` so that the class loader of `Submitter` won't be GCed. However, since `Submitter` is just a wrapper of `int`, we can replace it with `int[1]`.

Modifications:

Replace `Submitter` with `int[1]`.

Result:

No class loader leak in ForkJoinPool when using in a container.
This commit is contained in:
Xiaoyan Lin 2016-05-17 19:05:29 -07:00 committed by Norman Maurer
parent 2b340df452
commit 55dd7d035f

View File

@ -1078,8 +1078,13 @@ public class ForkJoinPool extends AbstractExecutorService {
* to avoid contention in one pool is likely to hold for others.
* Lazily initialized on first submission (but null-checked
* in other contexts to avoid unnecessary initialization).
*
* Note: this was changed to fix https://github.com/netty/netty/issues/4211
* Instead of using "ThreadLocal<Submitter>" like jsr166e, just use "ThreadLocal<int[]>" to
* avoid leaking the Submitter's class loader. Here "int[]" is just an array with exactly one
* int.
*/
static final ThreadLocal<Submitter> submitters;
static final ThreadLocal<int[]> submitters;
/**
* Creates a new ForkJoinWorkerThread. This factory is used unless
@ -1478,10 +1483,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* randomly modified upon collisions using xorshifts, which
* requires a non-zero seed.
*/
static final class Submitter {
int seed;
Submitter(int s) { seed = s; }
}
//static final class Submitter {
// int seed;
// Submitter(int s) { seed = s; }
//}
/**
* Unless shutting down, adds the given task to a submission queue
@ -1492,12 +1497,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
Submitter z = submitters.get();
int[] z = submitters.get();
WorkQueue q; int r, m, s, n, am; ForkJoinTask<?>[] a;
int ps = plock;
WorkQueue[] ws = workQueues;
if (z != null && ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & (r = z.seed) & SQMASK]) != null && r != 0 &&
(q = ws[m & (r = z[0]) & SQMASK]) != null && r != 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
@ -1533,18 +1538,18 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
private void fullExternalPush(ForkJoinTask<?> task) {
int r = 0; // random index seed
for (Submitter z = submitters.get();;) {
for (int[] z = submitters.get();;) {
WorkQueue[] ws; WorkQueue q; int ps, m, k;
if (z == null) {
if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
r += SEED_INCREMENT) && r != 0)
submitters.set(z = new Submitter(r));
submitters.set(z = new int[]{ r });
}
else if (r == 0) { // move to a different index
r = z.seed;
r = z[0];
r ^= r << 13; // same xorshift as WorkQueues
r ^= r >>> 17;
z.seed = r ^= (r << 5);
z[0] = r ^= (r << 5);
}
if ((ps = plock) < 0)
throw new RejectedExecutionException();
@ -2313,12 +2318,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* least one task.
*/
static WorkQueue commonSubmitterQueue() {
Submitter z; ForkJoinPool p; WorkQueue[] ws; int m, r;
int[] z; ForkJoinPool p; WorkQueue[] ws; int m, r;
return ((z = submitters.get()) != null &&
(p = common) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0) ?
ws[m & z.seed & SQMASK] : null;
ws[m & z[0] & SQMASK] : null;
}
/**
@ -2326,11 +2331,11 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final boolean tryExternalUnpush(ForkJoinTask<?> task) {
WorkQueue joiner; ForkJoinTask<?>[] a; int m, s;
Submitter z = submitters.get();
int[] z = submitters.get();
WorkQueue[] ws = workQueues;
boolean popped = false;
if (z != null && ws != null && (m = ws.length - 1) >= 0 &&
(joiner = ws[z.seed & m & SQMASK]) != null &&
(joiner = ws[z[0] & m & SQMASK]) != null &&
joiner.base != (s = joiner.top) &&
(a = joiner.array) != null) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
@ -2349,11 +2354,11 @@ public class ForkJoinPool extends AbstractExecutorService {
final int externalHelpComplete(CountedCompleter<?> task) {
WorkQueue joiner; int m, j;
Submitter z = submitters.get();
int[] z = submitters.get();
WorkQueue[] ws = workQueues;
int s = 0;
if (z != null && ws != null && (m = ws.length - 1) >= 0 &&
(joiner = ws[(j = z.seed) & m & SQMASK]) != null && task != null) {
(joiner = ws[(j = z[0]) & m & SQMASK]) != null && task != null) {
int scans = m + m + 1;
long c = 0L; // for stability check
j |= 1; // poll odd queues
@ -3279,7 +3284,7 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new Error(e);
}
submitters = new ThreadLocal<Submitter>();
submitters = new ThreadLocal<int[]>();
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");