diff --git a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java b/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java index 1f57b7d640..a3fca61c8a 100644 --- a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java +++ b/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java @@ -69,6 +69,7 @@ public final class FixedChannelPool extends SimpleChannelPool { private final int maxPendingAcquires; private int acquiredChannelCount; private int pendingAcquireCount; + private boolean closed; /** * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. @@ -217,6 +218,10 @@ public final class FixedChannelPool extends SimpleChannelPool { private void acquire0(final Promise promise) { assert executor.inEventLoop(); + if (closed) { + promise.setFailure(new IllegalStateException("FixedChannelPooled was closed")); + return; + } if (acquiredChannelCount < maxConnections) { assert acquiredChannelCount >= 0; @@ -256,6 +261,11 @@ public final class FixedChannelPool extends SimpleChannelPool { public void operationComplete(Future future) throws Exception { assert executor.inEventLoop(); + if (closed) { + promise.setFailure(new IllegalStateException("FixedChannelPooled was closed")); + return; + } + if (future.isSuccess()) { decrementAndRunTaskQueue(); promise.setSuccess(null); @@ -359,6 +369,11 @@ public final class FixedChannelPool extends SimpleChannelPool { public void operationComplete(Future future) throws Exception { assert executor.inEventLoop(); + if (closed) { + originalPromise.setFailure(new IllegalStateException("FixedChannelPooled was closed")); + return; + } + if (future.isSuccess()) { originalPromise.setSuccess(future.getNow()); } else { @@ -386,20 +401,23 @@ public final class FixedChannelPool extends SimpleChannelPool { executor.execute(new OneTimeTask() { @Override public void run() { - for (;;) { - AcquireTask task = pendingAcquireQueue.poll(); - if (task == null) { - break; + if (!closed) { + closed = true; + for (;;) { + AcquireTask task = pendingAcquireQueue.poll(); + if (task == null) { + break; + } + ScheduledFuture f = task.timeoutFuture; + if (f != null) { + f.cancel(false); + } + task.promise.setFailure(new ClosedChannelException()); } - ScheduledFuture f = task.timeoutFuture; - if (f != null) { - f.cancel(false); - } - task.promise.setFailure(new ClosedChannelException()); + acquiredChannelCount = 0; + pendingAcquireCount = 0; + FixedChannelPool.super.close(); } - acquiredChannelCount = 0; - pendingAcquireCount = 0; - FixedChannelPool.super.close(); } }); } diff --git a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java b/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java index d9a2f8a3eb..e37c69c2ee 100644 --- a/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java +++ b/transport/src/test/java/io/netty/channel/pool/FixedChannelPoolTest.java @@ -263,6 +263,42 @@ public class FixedChannelPoolTest { } } + @Test + public void testReleaseAfterClosePool() throws Exception { + EventLoopGroup group = new LocalEventLoopGroup(1); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group).channel(LocalChannel.class); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); + } + }); + + // Start server + Channel sc = sb.bind(addr).syncUninterruptibly().channel(); + + FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2); + final Future acquire = pool.acquire(); + final Channel channel = acquire.get(); + pool.close(); + group.submit(new Runnable() { + @Override + public void run() { + // NOOP + } + }).syncUninterruptibly(); + pool.release(channel).syncUninterruptibly(); + sc.close().syncUninterruptibly(); + channel.close().syncUninterruptibly(); + } + private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { @Override public void channelCreated(Channel ch) throws Exception {