Return the correct Future from FixedChannelPool.release()
Motivation: The behaviour of the FixedChannelPool.release was inconsistent with the SimpleChannelPool implementation, in that given promise is returned. In the FixedChannelPool implementation a new promise was return and this meant that the completion of that promise can be different. Specifically on releasing a channel to a closed pool, the parameter promise is failed with an IllegalStateException but the returned one will have been successful (as it was completed by call to super .release) Modification: Return the given promise as the result of FixedChannelPool.release Result: Returned promise will reflect the result of the release operation.
This commit is contained in:
parent
32b3f58f63
commit
b7a5743e8b
@ -21,6 +21,7 @@ import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
import io.netty.util.internal.ThrowableUtil;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
@ -41,7 +42,12 @@ public class FixedChannelPool extends SimpleChannelPool {
|
||||
private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace(
|
||||
new TimeoutException("Acquire operation took longer then configured maximum time"),
|
||||
FixedChannelPool.class, "<init>(...)");
|
||||
|
||||
static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace(
|
||||
new IllegalStateException("FixedChannelPooled was closed"),
|
||||
FixedChannelPool.class, "release(...)");
|
||||
static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace(
|
||||
new IllegalStateException("FixedChannelPooled was closed"),
|
||||
FixedChannelPool.class, "acquire0(...)");
|
||||
public enum AcquireTimeoutAction {
|
||||
/**
|
||||
* Create a new connection when the timeout is detected.
|
||||
@ -215,7 +221,7 @@ public class FixedChannelPool extends SimpleChannelPool {
|
||||
assert executor.inEventLoop();
|
||||
|
||||
if (closed) {
|
||||
promise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
|
||||
promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
|
||||
return;
|
||||
}
|
||||
if (acquiredChannelCount < maxConnections) {
|
||||
@ -250,6 +256,7 @@ public class FixedChannelPool extends SimpleChannelPool {
|
||||
|
||||
@Override
|
||||
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
|
||||
ObjectUtil.checkNotNull(promise, "promise");
|
||||
final Promise<Void> p = executor.newPromise();
|
||||
super.release(channel, p.addListener(new FutureListener<Void>() {
|
||||
|
||||
@ -260,7 +267,7 @@ public class FixedChannelPool extends SimpleChannelPool {
|
||||
if (closed) {
|
||||
// Since the pool is closed, we have no choice but to close the channel
|
||||
channel.close();
|
||||
promise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
|
||||
promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -277,7 +284,7 @@ public class FixedChannelPool extends SimpleChannelPool {
|
||||
}
|
||||
}
|
||||
}));
|
||||
return p;
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void decrementAndRunTaskQueue() {
|
||||
@ -372,7 +379,7 @@ public class FixedChannelPool extends SimpleChannelPool {
|
||||
// Since the pool is closed, we have no choice but to close the channel
|
||||
future.getNow().close();
|
||||
}
|
||||
originalPromise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
|
||||
originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -298,12 +298,15 @@ public class FixedChannelPoolTest {
|
||||
// NOOP
|
||||
}
|
||||
}).syncUninterruptibly();
|
||||
pool.release(channel).syncUninterruptibly();
|
||||
|
||||
// Since the pool is closed.. the release channel should have been closed
|
||||
try {
|
||||
pool.release(channel).syncUninterruptibly();
|
||||
fail();
|
||||
} catch (IllegalStateException e) {
|
||||
assertSame(FixedChannelPool.POOL_CLOSED_ON_RELEASE_EXCEPTION, e);
|
||||
}
|
||||
// Since the pool is closed, the Channel should have been closed as well.
|
||||
channel.closeFuture().syncUninterruptibly();
|
||||
assertFalse("Unexpected open channel", channel.isOpen());
|
||||
|
||||
sc.close().syncUninterruptibly();
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user