From 5d011b88958f867e99a94bdfae334dfb8ce1697c Mon Sep 17 00:00:00 2001 From: Ivan Bahdanau Date: Tue, 11 Aug 2015 19:55:54 -0700 Subject: [PATCH] Improve the logic around acquire channel function is improved. Motivation: The acquire channel function resulted in calling itself several times in case when channel polled from the pool queue was unhealthy, which resulted FixedChannelPool to be called several times which in it's turn caused FixedChannelPool.acquire() to be called and resulted into acquireChannelCount to be unnecessary increased. Example use case: 1) Create FixedChannelPool instance with one channel in the pool: new FixedChannelPool(cb, handler, 1) 2) Acquire channel A from the pool 3) close the channel A 4) Return it back to the pool 5) Acquire channel from the same pool again Expected result: new channel created and acquired, channel A that has been closed discarded and removed from the pool from being unhealthy Actual result: Channel A had been removed from the pool, how ever the new channel had never be acquired, instead the request to acquire had been added to the pending queue in FixedChannelPool and the acquireChannelCount is increased by one. The reason is that at the time when SimpleChannelPool figured out that the channel was unhealthy called FixedChannelPool.acquire to try to acquire new channel, how ever the request was added to the pendingTakQueue because by the time when FixedChannelPool.acquire was called, the acquireChannelCount was already "1" so new channel ould not be created cause of maxChannelsLimit=1. Modifications: The suggested approach modifies the SimpleChannelPool in a way so that when channel detected to be unhealthy it calls private method SimpleChannelPool.acquireHealthyFromPoolOrNew() which guarantees that SimpleChannelPool actually either finds a healthy channel in the pool and returns it or causes the promise.cause() in case when new channel was failed to be created. Result: The ```acquiredChannelCount``` is now calculated correctly as a result of SimpleChannelPool.acquire() of not being recursive on overridable acquire method. --- .../netty/channel/pool/SimpleChannelPool.java | 15 +++++-- .../channel/pool/FixedChannelPoolTest.java | 39 +++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java index 8526bbc5ac..311279a229 100644 --- a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java +++ b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java @@ -31,7 +31,7 @@ import io.netty.util.internal.PlatformDependent; import java.util.Deque; -import static io.netty.util.internal.ObjectUtil.checkNotNull; +import static io.netty.util.internal.ObjectUtil.*; /** * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire @@ -91,6 +91,15 @@ public class SimpleChannelPool implements ChannelPool { @Override public Future acquire(final Promise promise) { checkNotNull(promise, "promise"); + return acquireHealthyFromPoolOrNew(promise); + } + + /** + * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise. + * @param promise the promise to provide acquire result. + * @return future for acquiring a channel. + */ + private Future acquireHealthyFromPoolOrNew(final Promise promise) { try { final Channel ch = pollChannel(); if (ch == null) { @@ -165,11 +174,11 @@ public class SimpleChannelPool implements ChannelPool { } } else { closeChannel(ch); - acquire(promise); + acquireHealthyFromPoolOrNew(promise); } } else { closeChannel(ch); - acquire(promise); + acquireHealthyFromPoolOrNew(promise); } } diff --git a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java b/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java index a8848f7644..a3e12ec7ba 100644 --- a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java +++ b/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java @@ -151,6 +151,45 @@ public class FixedChannelPoolTest { group.shutdownGracefully(); } + /** + * Tests that the acquiredChannelCount is not added up several times for the same channel acquire request. + * @throws Exception + */ + @Test + public void testAcquireNewConnectionWhen() throws Exception { + EventLoopGroup group = new DefaultEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter()); + } + }); + + // Start server + Channel sc = sb.bind(addr).syncUninterruptibly().channel(); + ChannelPoolHandler handler = new TestChannelPoolHandler(); + ChannelPool pool = new FixedChannelPool(cb, handler, 1); + Channel channel1 = pool.acquire().syncUninterruptibly().getNow(); + channel1.close().syncUninterruptibly(); + pool.release(channel1); + + Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); + + assertNotSame(channel1, channel2); + sc.close().syncUninterruptibly(); + channel2.close().syncUninterruptibly(); + group.shutdownGracefully(); + } + @Test(expected = IllegalStateException.class) public void testAcquireBoundQueue() throws Exception { EventLoopGroup group = new LocalEventLoopGroup();