Expose channel count for FixedChannelPool (#8059)

Motivation:

We use FixedChannelPool in production, and we believe we have a leak that doesn't return sockets to the pool (but they should be closed), thus blocking us from creating new connections when we need them. I haven't confirmed this yet, but right now I have to resort to reflection to access this field which makes me sad.

Modification:

Expose the acquiredChannelCount field through a getter method.

Result:

Allows introspection of the pool size in FixedChannelPool.
This commit is contained in:
Tyler Rockwood 2018-06-27 23:12:26 -07:00 committed by Norman Maurer
parent d5d1b898d5
commit 34b25dc94c

View File

@ -28,6 +28,7 @@ import io.netty.util.internal.ThrowableUtil;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -70,7 +71,7 @@ public class FixedChannelPool extends SimpleChannelPool {
private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>(); private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
private final int maxConnections; private final int maxConnections;
private final int maxPendingAcquires; private final int maxPendingAcquires;
private int acquiredChannelCount; private final AtomicInteger acquiredChannelCount = new AtomicInteger();
private int pendingAcquireCount; private int pendingAcquireCount;
private boolean closed; private boolean closed;
@ -229,6 +230,11 @@ public class FixedChannelPool extends SimpleChannelPool {
this.maxPendingAcquires = maxPendingAcquires; this.maxPendingAcquires = maxPendingAcquires;
} }
/** Returns the number of acquired channels that this pool thinks it has. */
public int acquiredChannelCount() {
return acquiredChannelCount.get();
}
@Override @Override
public Future<Channel> acquire(final Promise<Channel> promise) { public Future<Channel> acquire(final Promise<Channel> promise) {
try { try {
@ -255,8 +261,8 @@ public class FixedChannelPool extends SimpleChannelPool {
promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
return; return;
} }
if (acquiredChannelCount < maxConnections) { if (acquiredChannelCount.get() < maxConnections) {
assert acquiredChannelCount >= 0; assert acquiredChannelCount.get() >= 0;
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
// EventLoop // EventLoop
@ -319,10 +325,9 @@ public class FixedChannelPool extends SimpleChannelPool {
} }
private void decrementAndRunTaskQueue() { private void decrementAndRunTaskQueue() {
--acquiredChannelCount;
// We should never have a negative value. // We should never have a negative value.
assert acquiredChannelCount >= 0; int currentCount = acquiredChannelCount.decrementAndGet();
assert currentCount >= 0;
// Run the pending acquire tasks before notify the original promise so if the user would // Run the pending acquire tasks before notify the original promise so if the user would
// try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >= // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
@ -332,7 +337,7 @@ public class FixedChannelPool extends SimpleChannelPool {
} }
private void runTaskQueue() { private void runTaskQueue() {
while (acquiredChannelCount < maxConnections) { while (acquiredChannelCount.get() < maxConnections) {
AcquireTask task = pendingAcquireQueue.poll(); AcquireTask task = pendingAcquireQueue.poll();
if (task == null) { if (task == null) {
break; break;
@ -352,7 +357,7 @@ public class FixedChannelPool extends SimpleChannelPool {
// We should never have a negative value. // We should never have a negative value.
assert pendingAcquireCount >= 0; assert pendingAcquireCount >= 0;
assert acquiredChannelCount >= 0; assert acquiredChannelCount.get() >= 0;
} }
// AcquireTask extends AcquireListener to reduce object creations and so GC pressure // AcquireTask extends AcquireListener to reduce object creations and so GC pressure
@ -431,7 +436,7 @@ public class FixedChannelPool extends SimpleChannelPool {
if (acquired) { if (acquired) {
return; return;
} }
acquiredChannelCount++; acquiredChannelCount.incrementAndGet();
acquired = true; acquired = true;
} }
} }
@ -464,7 +469,7 @@ public class FixedChannelPool extends SimpleChannelPool {
} }
task.promise.setFailure(new ClosedChannelException()); task.promise.setFailure(new ClosedChannelException());
} }
acquiredChannelCount = 0; acquiredChannelCount.set(0);
pendingAcquireCount = 0; pendingAcquireCount = 0;
// Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need