Improve the logic around acquire channel function is improved.

Motivation:
The acquire channel function resulted in calling itself several times in case when channel polled from the pool queue was unhealthy, which resulted FixedChannelPool to be called several times which in it's turn caused FixedChannelPool.acquire() to be called and resulted into acquireChannelCount to be unnecessary increased.
Example use case:
1) Create FixedChannelPool instance with one channel in the pool: new FixedChannelPool(cb, handler, 1)
2) Acquire channel A from the pool
3) close the channel A
4) Return it back to the pool
5) Acquire channel from the same pool again
Expected result:
new channel created and acquired, channel A that has been closed discarded and removed from the pool from being unhealthy
Actual result:
Channel A had been removed from the pool, how ever the new channel had never be acquired, instead the request to acquire had been added to the pending queue in FixedChannelPool and the acquireChannelCount is increased by one. The reason is that at the time when SimpleChannelPool figured out that the channel was unhealthy called FixedChannelPool.acquire to try to acquire new channel, how ever the request was added to the pendingTakQueue because by the time when FixedChannelPool.acquire was called, the acquireChannelCount was already "1" so new channel ould not be created cause of maxChannelsLimit=1.

Modifications:
The suggested approach modifies the SimpleChannelPool in a way so that when channel detected to be unhealthy it calls private method SimpleChannelPool.acquireHealthyFromPoolOrNew() which guarantees that SimpleChannelPool actually either finds a healthy channel in the pool and returns it or causes the promise.cause() in case when new channel was failed to be created.

 Result:
The  ```acquiredChannelCount``` is now calculated correctly as a result of SimpleChannelPool.acquire() of not being recursive on overridable acquire method.
This commit is contained in:
Ivan Bahdanau 2015-08-11 19:55:54 -07:00 committed by Norman Maurer
parent 0be53f296f
commit 5d011b8895
2 changed files with 51 additions and 3 deletions

View File

@ -31,7 +31,7 @@ import io.netty.util.internal.PlatformDependent;
import java.util.Deque; import java.util.Deque;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.*;
/** /**
* Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
@ -91,6 +91,15 @@ public class SimpleChannelPool implements ChannelPool {
@Override @Override
public Future<Channel> acquire(final Promise<Channel> promise) { public Future<Channel> acquire(final Promise<Channel> promise) {
checkNotNull(promise, "promise"); checkNotNull(promise, "promise");
return acquireHealthyFromPoolOrNew(promise);
}
/**
* Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
* @param promise the promise to provide acquire result.
* @return future for acquiring a channel.
*/
private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
try { try {
final Channel ch = pollChannel(); final Channel ch = pollChannel();
if (ch == null) { if (ch == null) {
@ -165,11 +174,11 @@ public class SimpleChannelPool implements ChannelPool {
} }
} else { } else {
closeChannel(ch); closeChannel(ch);
acquire(promise); acquireHealthyFromPoolOrNew(promise);
} }
} else { } else {
closeChannel(ch); closeChannel(ch);
acquire(promise); acquireHealthyFromPoolOrNew(promise);
} }
} }

View File

@ -151,6 +151,45 @@ public class FixedChannelPoolTest {
group.shutdownGracefully(); group.shutdownGracefully();
} }
/**
* Tests that the acquiredChannelCount is not added up several times for the same channel acquire request.
* @throws Exception
*/
@Test
public void testAcquireNewConnectionWhen() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
ChannelPoolHandler handler = new TestChannelPoolHandler();
ChannelPool pool = new FixedChannelPool(cb, handler, 1);
Channel channel1 = pool.acquire().syncUninterruptibly().getNow();
channel1.close().syncUninterruptibly();
pool.release(channel1);
Channel channel2 = pool.acquire().syncUninterruptibly().getNow();
assertNotSame(channel1, channel2);
sc.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
group.shutdownGracefully();
}
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testAcquireBoundQueue() throws Exception { public void testAcquireBoundQueue() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup(); EventLoopGroup group = new LocalEventLoopGroup();