Return the correct Future from FixedChannelPool.release()

Motivation:

The behaviour of the FixedChannelPool.release was inconsistent with the
SimpleChannelPool implementation, in that given promise is returned.

In the FixedChannelPool implementation a new promise was return and
this meant that the completion of that promise can be different.
Specifically on releasing a channel to a closed pool, the parameter
promise is failed with an IllegalStateException but the returned one
will have been successful (as it was completed by call to super
.release)

Modification:

Return the given promise as the result of FixedChannelPool.release

Result:

Returned promise will reflect the result of the release operation.
This commit is contained in:
Norman Maurer 2017-06-27 11:55:23 +02:00
parent b6af2bac8b
commit c71e4459c8
2 changed files with 19 additions and 9 deletions

View File

@ -21,6 +21,7 @@ import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.ThrowableUtil;
import java.nio.channels.ClosedChannelException;
@ -41,7 +42,12 @@ public class FixedChannelPool extends SimpleChannelPool {
private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace(
new TimeoutException("Acquire operation took longer then configured maximum time"),
FixedChannelPool.class, "<init>(...)");
static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace(
new IllegalStateException("FixedChannelPooled was closed"),
FixedChannelPool.class, "release(...)");
static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace(
new IllegalStateException("FixedChannelPooled was closed"),
FixedChannelPool.class, "acquire0(...)");
public enum AcquireTimeoutAction {
/**
* Create a new connection when the timeout is detected.
@ -215,7 +221,7 @@ public class FixedChannelPool extends SimpleChannelPool {
assert executor.inEventLoop();
if (closed) {
promise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
return;
}
if (acquiredChannelCount < maxConnections) {
@ -250,6 +256,7 @@ public class FixedChannelPool extends SimpleChannelPool {
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
ObjectUtil.checkNotNull(promise, "promise");
final Promise<Void> p = executor.newPromise();
super.release(channel, p.addListener(new FutureListener<Void>() {
@ -260,7 +267,7 @@ public class FixedChannelPool extends SimpleChannelPool {
if (closed) {
// Since the pool is closed, we have no choice but to close the channel
channel.close();
promise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
return;
}
@ -277,7 +284,7 @@ public class FixedChannelPool extends SimpleChannelPool {
}
}
}));
return p;
return promise;
}
private void decrementAndRunTaskQueue() {
@ -372,7 +379,7 @@ public class FixedChannelPool extends SimpleChannelPool {
// Since the pool is closed, we have no choice but to close the channel
future.getNow().close();
}
originalPromise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
return;
}

View File

@ -297,12 +297,15 @@ public class FixedChannelPoolTest {
// NOOP
}
}).syncUninterruptibly();
pool.release(channel).syncUninterruptibly();
// Since the pool is closed.. the release channel should have been closed
try {
pool.release(channel).syncUninterruptibly();
fail();
} catch (IllegalStateException e) {
assertSame(FixedChannelPool.POOL_CLOSED_ON_RELEASE_EXCEPTION, e);
}
// Since the pool is closed, the Channel should have been closed as well.
channel.closeFuture().syncUninterruptibly();
assertFalse("Unexpected open channel", channel.isOpen());
sc.close().syncUninterruptibly();
}