[#3988] Correctly count acquired channels in FixedChannelPool
Motivation: We missed to correctly count acquired channels in FixedChannelPool which could produce an assert error. Modifications: Only try to decrement acquired count if the channel was really acuired. Result: No more assert error possible.
This commit is contained in:
parent
5c7022d494
commit
467a07aa30
@ -271,6 +271,8 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
timeoutFuture.cancel(false);
|
timeoutFuture.cancel(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
task.acquired();
|
||||||
|
|
||||||
--pendingAcquireCount;
|
--pendingAcquireCount;
|
||||||
++acquiredChannelCount;
|
++acquiredChannelCount;
|
||||||
|
|
||||||
@ -289,7 +291,7 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
ScheduledFuture<?> timeoutFuture;
|
ScheduledFuture<?> timeoutFuture;
|
||||||
|
|
||||||
public AcquireTask(Promise<Channel> promise) {
|
public AcquireTask(Promise<Channel> promise) {
|
||||||
super(promise);
|
super(promise, false);
|
||||||
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
|
// We need to create a new promise as we need to ensure the AcquireListener runs in the correct
|
||||||
// EventLoop.
|
// EventLoop.
|
||||||
this.promise = executor.<Channel>newPromise().addListener(this);
|
this.promise = executor.<Channel>newPromise().addListener(this);
|
||||||
@ -322,9 +324,15 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
|
|
||||||
private class AcquireListener implements FutureListener<Channel> {
|
private class AcquireListener implements FutureListener<Channel> {
|
||||||
private final Promise<Channel> originalPromise;
|
private final Promise<Channel> originalPromise;
|
||||||
|
protected boolean acquired;
|
||||||
|
|
||||||
AcquireListener(Promise<Channel> originalPromise) {
|
AcquireListener(Promise<Channel> originalPromise) {
|
||||||
|
this(originalPromise, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected AcquireListener(Promise<Channel> originalPromise, boolean acquired) {
|
||||||
this.originalPromise = originalPromise;
|
this.originalPromise = originalPromise;
|
||||||
|
this.acquired = acquired;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -334,11 +342,19 @@ public final class FixedChannelPool extends SimpleChannelPool {
|
|||||||
if (future.isSuccess()) {
|
if (future.isSuccess()) {
|
||||||
originalPromise.setSuccess(future.getNow());
|
originalPromise.setSuccess(future.getNow());
|
||||||
} else {
|
} else {
|
||||||
// Something went wrong try to run pending acquire tasks.
|
if (acquired) {
|
||||||
decrementAndRunTaskQueue();
|
decrementAndRunTaskQueue();
|
||||||
|
} else {
|
||||||
|
runTaskQueue();
|
||||||
|
}
|
||||||
|
|
||||||
originalPromise.setFailure(future.cause());
|
originalPromise.setFailure(future.cause());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void acquired() {
|
||||||
|
acquired = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
x
Reference in New Issue
Block a user