Fix a class loader leak in ForkJoinPool

Motivation:

As reported in #4211, when using Netty in Tomcat (or other container based deployment), ForkJoinPool leaks an instance of `Submitter` so that the class loader of `Submitter` won't be GCed. However, since `Submitter` is just a wrapper of `int`, we can replace it with `int[1]`.

Modifications:

Replace `Submitter` with `int[1]`.

Result:

No class loader leak in ForkJoinPool when using in a container.
This commit is contained in:
Xiaoyan Lin 2016-05-17 19:05:29 -07:00 committed by Norman Maurer
parent f984870ccc
commit 28bc1070e7

View File

@ -1078,8 +1078,13 @@ public class ForkJoinPool extends AbstractExecutorService {
* to avoid contention in one pool is likely to hold for others. * to avoid contention in one pool is likely to hold for others.
* Lazily initialized on first submission (but null-checked * Lazily initialized on first submission (but null-checked
* in other contexts to avoid unnecessary initialization). * in other contexts to avoid unnecessary initialization).
*
* Note: this was changed to fix https://github.com/netty/netty/issues/4211
* Instead of using "ThreadLocal<Submitter>" like jsr166e, just use "ThreadLocal<int[]>" to
* avoid leaking the Submitter's class loader. Here "int[]" is just an array with exactly one
* int.
*/ */
static final ThreadLocal<Submitter> submitters; static final ThreadLocal<int[]> submitters;
/** /**
* Creates a new ForkJoinWorkerThread. This factory is used unless * Creates a new ForkJoinWorkerThread. This factory is used unless
@ -1478,10 +1483,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* randomly modified upon collisions using xorshifts, which * randomly modified upon collisions using xorshifts, which
* requires a non-zero seed. * requires a non-zero seed.
*/ */
static final class Submitter { //static final class Submitter {
int seed; // int seed;
Submitter(int s) { seed = s; } // Submitter(int s) { seed = s; }
} //}
/** /**
* Unless shutting down, adds the given task to a submission queue * Unless shutting down, adds the given task to a submission queue
@ -1492,12 +1497,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param task the task. Caller must ensure non-null. * @param task the task. Caller must ensure non-null.
*/ */
final void externalPush(ForkJoinTask<?> task) { final void externalPush(ForkJoinTask<?> task) {
Submitter z = submitters.get(); int[] z = submitters.get();
WorkQueue q; int r, m, s, n, am; ForkJoinTask<?>[] a; WorkQueue q; int r, m, s, n, am; ForkJoinTask<?>[] a;
int ps = plock; int ps = plock;
WorkQueue[] ws = workQueues; WorkQueue[] ws = workQueues;
if (z != null && ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 && if (z != null && ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & (r = z.seed) & SQMASK]) != null && r != 0 && (q = ws[m & (r = z[0]) & SQMASK]) != null && r != 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
if ((a = q.array) != null && if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) { (am = a.length - 1) > (n = (s = q.top) - q.base)) {
@ -1533,18 +1538,18 @@ public class ForkJoinPool extends AbstractExecutorService {
*/ */
private void fullExternalPush(ForkJoinTask<?> task) { private void fullExternalPush(ForkJoinTask<?> task) {
int r = 0; // random index seed int r = 0; // random index seed
for (Submitter z = submitters.get();;) { for (int[] z = submitters.get();;) {
WorkQueue[] ws; WorkQueue q; int ps, m, k; WorkQueue[] ws; WorkQueue q; int ps, m, k;
if (z == null) { if (z == null) {
if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed, if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
r += SEED_INCREMENT) && r != 0) r += SEED_INCREMENT) && r != 0)
submitters.set(z = new Submitter(r)); submitters.set(z = new int[]{ r });
} }
else if (r == 0) { // move to a different index else if (r == 0) { // move to a different index
r = z.seed; r = z[0];
r ^= r << 13; // same xorshift as WorkQueues r ^= r << 13; // same xorshift as WorkQueues
r ^= r >>> 17; r ^= r >>> 17;
z.seed = r ^= (r << 5); z[0] = r ^= (r << 5);
} }
if ((ps = plock) < 0) if ((ps = plock) < 0)
throw new RejectedExecutionException(); throw new RejectedExecutionException();
@ -2313,12 +2318,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* least one task. * least one task.
*/ */
static WorkQueue commonSubmitterQueue() { static WorkQueue commonSubmitterQueue() {
Submitter z; ForkJoinPool p; WorkQueue[] ws; int m, r; int[] z; ForkJoinPool p; WorkQueue[] ws; int m, r;
return ((z = submitters.get()) != null && return ((z = submitters.get()) != null &&
(p = common) != null && (p = common) != null &&
(ws = p.workQueues) != null && (ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0) ? (m = ws.length - 1) >= 0) ?
ws[m & z.seed & SQMASK] : null; ws[m & z[0] & SQMASK] : null;
} }
/** /**
@ -2326,11 +2331,11 @@ public class ForkJoinPool extends AbstractExecutorService {
*/ */
final boolean tryExternalUnpush(ForkJoinTask<?> task) { final boolean tryExternalUnpush(ForkJoinTask<?> task) {
WorkQueue joiner; ForkJoinTask<?>[] a; int m, s; WorkQueue joiner; ForkJoinTask<?>[] a; int m, s;
Submitter z = submitters.get(); int[] z = submitters.get();
WorkQueue[] ws = workQueues; WorkQueue[] ws = workQueues;
boolean popped = false; boolean popped = false;
if (z != null && ws != null && (m = ws.length - 1) >= 0 && if (z != null && ws != null && (m = ws.length - 1) >= 0 &&
(joiner = ws[z.seed & m & SQMASK]) != null && (joiner = ws[z[0] & m & SQMASK]) != null &&
joiner.base != (s = joiner.top) && joiner.base != (s = joiner.top) &&
(a = joiner.array) != null) { (a = joiner.array) != null) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
@ -2349,11 +2354,11 @@ public class ForkJoinPool extends AbstractExecutorService {
final int externalHelpComplete(CountedCompleter<?> task) { final int externalHelpComplete(CountedCompleter<?> task) {
WorkQueue joiner; int m, j; WorkQueue joiner; int m, j;
Submitter z = submitters.get(); int[] z = submitters.get();
WorkQueue[] ws = workQueues; WorkQueue[] ws = workQueues;
int s = 0; int s = 0;
if (z != null && ws != null && (m = ws.length - 1) >= 0 && if (z != null && ws != null && (m = ws.length - 1) >= 0 &&
(joiner = ws[(j = z.seed) & m & SQMASK]) != null && task != null) { (joiner = ws[(j = z[0]) & m & SQMASK]) != null && task != null) {
int scans = m + m + 1; int scans = m + m + 1;
long c = 0L; // for stability check long c = 0L; // for stability check
j |= 1; // poll odd queues j |= 1; // poll odd queues
@ -3279,7 +3284,7 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new Error(e); throw new Error(e);
} }
submitters = new ThreadLocal<Submitter>(); submitters = new ThreadLocal<int[]>();
defaultForkJoinWorkerThreadFactory = defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory(); new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread"); modifyThreadPermission = new RuntimePermission("modifyThread");