Correctly count acquired channels when timeout occurs in FixedChannelPool
Motivation: We don't decrease acquired channel count in FixedChannelPool when timeout occurs by AcquireTimeoutAction.NEW and eventually fails. Modifications: Set AcquireTask.acquired=true to call decrementAndRunTaskQueue when timeout action fails. Result: Acquired channel count decreases correctly.
This commit is contained in:
parent
2edcb80c49
commit
b6dc1a4cec
@ -117,7 +117,7 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
* be failed.
|
* be failed.
|
||||||
*/
|
*/
|
||||||
public FixedChannelPool(Bootstrap bootstrap,
|
public FixedChannelPool(Bootstrap bootstrap,
|
||||||
ChannelPoolHandler handler,
|
ChannelPoolHandler handler,
|
||||||
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
|
ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
|
||||||
final long acquireTimeoutMillis,
|
final long acquireTimeoutMillis,
|
||||||
int maxConnections, int maxPendingAcquires) {
|
int maxConnections, int maxPendingAcquires) {
|
||||||
@ -153,7 +153,7 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
public void onTimeout(AcquireTask task) {
|
public void onTimeout(AcquireTask task) {
|
||||||
// Increment the acquire count and delegate to super to actually acquire a Channel which will
|
// Increment the acquire count and delegate to super to actually acquire a Channel which will
|
||||||
// create a new connetion.
|
// create a new connetion.
|
||||||
++acquiredChannelCount;
|
task.acquired();
|
||||||
|
|
||||||
FixedChannelPool.super.acquire(task.promise);
|
FixedChannelPool.super.acquire(task.promise);
|
||||||
}
|
}
|
||||||
@ -191,14 +191,14 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
assert executor.inEventLoop();
|
assert executor.inEventLoop();
|
||||||
|
|
||||||
if (acquiredChannelCount < maxConnections) {
|
if (acquiredChannelCount < maxConnections) {
|
||||||
++acquiredChannelCount;
|
assert acquiredChannelCount >= 0;
|
||||||
|
|
||||||
assert acquiredChannelCount > 0;
|
|
||||||
|
|
||||||
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
|
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
|
||||||
// EventLoop
|
// EventLoop
|
||||||
Promise<Channel> p = executor.newPromise();
|
Promise<Channel> p = executor.newPromise();
|
||||||
p.addListener(new AcquireListener(promise));
|
AcquireListener l = new AcquireListener(promise);
|
||||||
|
l.acquired();
|
||||||
|
p.addListener(l);
|
||||||
super.acquire(p);
|
super.acquire(p);
|
||||||
} else {
|
} else {
|
||||||
if (pendingAcquireCount >= maxPendingAcquires) {
|
if (pendingAcquireCount >= maxPendingAcquires) {
|
||||||
@ -271,10 +271,8 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
timeoutFuture.cancel(false);
|
timeoutFuture.cancel(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
task.acquired();
|
|
||||||
|
|
||||||
--pendingAcquireCount;
|
--pendingAcquireCount;
|
||||||
++acquiredChannelCount;
|
task.acquired();
|
||||||
|
|
||||||
super.acquire(task.promise);
|
super.acquire(task.promise);
|
||||||
}
|
}
|
||||||
@ -291,7 +289,7 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
ScheduledFuture<?> timeoutFuture;
|
ScheduledFuture<?> timeoutFuture;
|
||||||
|
|
||||||
public AcquireTask(Promise<Channel> promise) {
|
public AcquireTask(Promise<Channel> promise) {
|
||||||
super(promise, false);
|
super(promise);
|
||||||
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
|
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
|
||||||
// EventLoop.
|
// EventLoop.
|
||||||
this.promise = executor.<Channel>newPromise().addListener(this);
|
this.promise = executor.<Channel>newPromise().addListener(this);
|
||||||
@ -327,12 +325,7 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
protected boolean acquired;
|
protected boolean acquired;
|
||||||
|
|
||||||
AcquireListener(Promise<Channel> originalPromise) {
|
AcquireListener(Promise<Channel> originalPromise) {
|
||||||
this(originalPromise, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected AcquireListener(Promise<Channel> originalPromise, boolean acquired) {
|
|
||||||
this.originalPromise = originalPromise;
|
this.originalPromise = originalPromise;
|
||||||
this.acquired = acquired;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -353,6 +346,10 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void acquired() {
|
public void acquired() {
|
||||||
|
if (acquired) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
acquiredChannelCount++;
|
||||||
acquired = true;
|
acquired = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user