diff --git a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java index 8526bbc5ac..311279a229 100644 --- a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java +++ b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java @@ -31,7 +31,7 @@ import io.netty.util.internal.PlatformDependent; import java.util.Deque; -import static io.netty.util.internal.ObjectUtil.checkNotNull; +import static io.netty.util.internal.ObjectUtil.*; /** * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire @@ -91,6 +91,15 @@ public class SimpleChannelPool implements ChannelPool { @Override public Future acquire(final Promise promise) { checkNotNull(promise, "promise"); + return acquireHealthyFromPoolOrNew(promise); + } + + /** + * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise. + * @param promise the promise to provide acquire result. + * @return future for acquiring a channel. + */ + private Future acquireHealthyFromPoolOrNew(final Promise promise) { try { final Channel ch = pollChannel(); if (ch == null) { @@ -165,11 +174,11 @@ public class SimpleChannelPool implements ChannelPool { } } else { closeChannel(ch); - acquire(promise); + acquireHealthyFromPoolOrNew(promise); } } else { closeChannel(ch); - acquire(promise); + acquireHealthyFromPoolOrNew(promise); } } diff --git a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java b/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java index 8a3da2d055..d9a2f8a3eb 100644 --- a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java +++ b/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java @@ -151,6 +151,45 @@ public class FixedChannelPoolTest { group.shutdownGracefully(); } + /** + * Tests that the acquiredChannelCount is not added up several times for the same channel acquire request. + * @throws Exception + */ + @Test + public void testAcquireNewConnectionWhen() throws Exception { + EventLoopGroup group = new DefaultEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelHandlerAdapter()); + } + }); + + // Start server + Channel sc = sb.bind(addr).syncUninterruptibly().channel(); + ChannelPoolHandler handler = new TestChannelPoolHandler(); + ChannelPool pool = new FixedChannelPool(cb, handler, 1); + Channel channel1 = pool.acquire().syncUninterruptibly().getNow(); + channel1.close().syncUninterruptibly(); + pool.release(channel1); + + Channel channel2 = pool.acquire().syncUninterruptibly().getNow(); + + assertNotSame(channel1, channel2); + sc.close().syncUninterruptibly(); + channel2.close().syncUninterruptibly(); + group.shutdownGracefully(); + } + @Test(expected = IllegalStateException.class) public void testAcquireBoundQueue() throws Exception { EventLoopGroup group = new DefaultEventLoopGroup();