From af58bfcfb802e2b05549deb93c6ec219ef6be77b Mon Sep 17 00:00:00 2001 From: Robert Mihaly Date: Wed, 11 Dec 2019 09:28:44 +0100 Subject: [PATCH] Synchronously close pools when closing AbstractChannelPoolMap (#9857) Motivation: In #9830 the get/remove/close methods implementation changed to avoid deadlocks on event loops. The change involved modifying the methods to close the managed ChannelPools asynchronously and return immediately. While this behavior might be fine for get/remove, it is changing what a user expects from a close() method and after returning from close() there might be still resources open. Modifications: This change is a follow-up for #9830 to preserve the synchronous behavior of the AbstractChannelPoolMap#close() method. Result: AbstractChannelPoolMap#close() returns once the managed pools have been closed. --- .../channel/pool/AbstractChannelPoolMap.java | 47 +++++++++- .../pool/AbstractChannelPoolMapTest.java | 85 +++++++++++++++++++ 2 files changed, 128 insertions(+), 4 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java b/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java index b243c0b0d1..a58c93f821 100644 --- a/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java +++ b/transport/src/main/java/io/netty/channel/pool/AbstractChannelPoolMap.java @@ -15,6 +15,10 @@ */ package io.netty.channel.pool; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.ReadOnlyIterator; @@ -51,6 +55,9 @@ public abstract class AbstractChannelPoolMap * Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns {@code true} if removed, * {@code false} otherwise. * + * If the removed pool extends {@link SimpleChannelPool} it will be closed asynchronously to avoid blocking in + * this method. + * * Please note that {@code null} keys are not allowed. */ public final boolean remove(K key) { @@ -62,17 +69,48 @@ public abstract class AbstractChannelPoolMap return false; } + /** + * Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns a future that comletes with a + * {@code true} result if the pool has been removed by this call, otherwise the result is {@code false}. + * + * If the removed pool extends {@link SimpleChannelPool} it will be closed asynchronously to avoid blocking in + * this method. The returned future will be completed once this asynchronous pool close operation completes. + */ + private Future removeAsyncIfSupported(K key) { + P pool = map.remove(checkNotNull(key, "key")); + if (pool != null) { + final Promise removePromise = GlobalEventExecutor.INSTANCE.newPromise(); + poolCloseAsyncIfSupported(pool).addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + removePromise.setSuccess(Boolean.TRUE); + } else { + removePromise.setFailure(future.cause()); + } + } + }); + return removePromise; + } + return GlobalEventExecutor.INSTANCE.newSucceededFuture(Boolean.FALSE); + } + /** * If the pool implementation supports asynchronous close, then use it to avoid a blocking close call in case * the ChannelPoolMap operations are called from an EventLoop. * * @param pool the ChannelPool to be closed */ - private static void poolCloseAsyncIfSupported(ChannelPool pool) { + private static Future poolCloseAsyncIfSupported(ChannelPool pool) { if (pool instanceof SimpleChannelPool) { - ((SimpleChannelPool) pool).closeAsync(); + return ((SimpleChannelPool) pool).closeAsync(); } else { - pool.close(); + try { + pool.close(); + return GlobalEventExecutor.INSTANCE.newSucceededFuture(null); + } catch (Exception e) { + return GlobalEventExecutor.INSTANCE.newFailedFuture(e); + } } } @@ -108,7 +146,8 @@ public abstract class AbstractChannelPoolMap @Override public final void close() { for (K key: map.keySet()) { - remove(key); + // Wait for remove to finish to ensure that resources are released before returning from close + removeAsyncIfSupported(key).syncUninterruptibly(); } } } diff --git a/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java b/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java index 8d03cd7b6c..2df53abf27 100644 --- a/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java +++ b/transport/src/test/java/io/netty/channel/pool/AbstractChannelPoolMapTest.java @@ -22,9 +22,14 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalEventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; import org.junit.Test; import java.net.ConnectException; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -67,10 +72,90 @@ public class AbstractChannelPoolMapTest { pool.acquire().syncUninterruptibly(); } + @Test + public void testRemoveClosesChannelPool() { + EventLoopGroup group = new LocalEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + final Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + AbstractChannelPoolMap poolMap = + new AbstractChannelPoolMap() { + @Override + protected TestPool newPool(EventLoop key) { + return new TestPool(cb.clone(key), new TestChannelPoolHandler()); + } + }; + + EventLoop loop = group.next(); + + TestPool pool = poolMap.get(loop); + assertTrue(poolMap.remove(loop)); + + // the pool should be closed eventually after remove + pool.closeFuture.awaitUninterruptibly(1, TimeUnit.SECONDS); + assertTrue(pool.closeFuture.isDone()); + } + + @Test + public void testCloseClosesPoolsImmediately() { + EventLoopGroup group = new LocalEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + final Bootstrap cb = new Bootstrap(); + cb.remoteAddress(addr); + cb.group(group) + .channel(LocalChannel.class); + + AbstractChannelPoolMap poolMap = + new AbstractChannelPoolMap() { + @Override + protected TestPool newPool(EventLoop key) { + return new TestPool(cb.clone(key), new TestChannelPoolHandler()); + } + }; + + EventLoop loop = group.next(); + + TestPool pool = poolMap.get(loop); + assertFalse(pool.closeFuture.isDone()); + + // the pool should be closed immediately after remove + poolMap.close(); + assertTrue(pool.closeFuture.isDone()); + } + private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { @Override public void channelCreated(Channel ch) throws Exception { // NOOP } } + + private static final class TestPool extends SimpleChannelPool { + private final Promise closeFuture; + + TestPool(Bootstrap bootstrap, ChannelPoolHandler handler) { + super(bootstrap, handler); + EventExecutor executor = bootstrap.config().group().next(); + closeFuture = executor.newPromise(); + } + + @Override + public Future closeAsync() { + Future poolClose = super.closeAsync(); + poolClose.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + closeFuture.setSuccess(null); + } else { + closeFuture.setFailure(future.cause()); + } + } + }); + return poolClose; + } + } }