Synchronously close pools when closing AbstractChannelPoolMap (#9857)

Motivation:

In #9830 the get/remove/close methods implementation changed to avoid
deadlocks on event loops. The change involved modifying the methods to
close the managed ChannelPools asynchronously and return immediately.
While this behavior might be fine for get/remove, it is changing what
a user expects from a close() method and after returning from close()
there might be still resources open.

Modifications:

This change is a follow-up for #9830 to preserve the synchronous
behavior of the AbstractChannelPoolMap#close() method.

Result:

AbstractChannelPoolMap#close() returns once the managed pools have
been closed.
This commit is contained in:
Robert Mihaly 2019-12-11 09:28:44 +01:00 committed by Norman Maurer
parent 66fca3b58e
commit af58bfcfb8
2 changed files with 128 additions and 4 deletions

View File

@ -15,6 +15,10 @@
*/
package io.netty.channel.pool;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReadOnlyIterator;
@ -51,6 +55,9 @@ public abstract class AbstractChannelPoolMap<K, P extends ChannelPool>
* Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns {@code true} if removed,
* {@code false} otherwise.
*
* If the removed pool extends {@link SimpleChannelPool} it will be closed asynchronously to avoid blocking in
* this method.
*
* Please note that {@code null} keys are not allowed.
*/
public final boolean remove(K key) {
@ -62,17 +69,48 @@ public abstract class AbstractChannelPoolMap<K, P extends ChannelPool>
return false;
}
/**
* Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns a future that comletes with a
* {@code true} result if the pool has been removed by this call, otherwise the result is {@code false}.
*
* If the removed pool extends {@link SimpleChannelPool} it will be closed asynchronously to avoid blocking in
* this method. The returned future will be completed once this asynchronous pool close operation completes.
*/
private Future<Boolean> removeAsyncIfSupported(K key) {
P pool = map.remove(checkNotNull(key, "key"));
if (pool != null) {
final Promise<Boolean> removePromise = GlobalEventExecutor.INSTANCE.newPromise();
poolCloseAsyncIfSupported(pool).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
removePromise.setSuccess(Boolean.TRUE);
} else {
removePromise.setFailure(future.cause());
}
}
});
return removePromise;
}
return GlobalEventExecutor.INSTANCE.newSucceededFuture(Boolean.FALSE);
}
/**
* If the pool implementation supports asynchronous close, then use it to avoid a blocking close call in case
* the ChannelPoolMap operations are called from an EventLoop.
*
* @param pool the ChannelPool to be closed
*/
private static void poolCloseAsyncIfSupported(ChannelPool pool) {
private static Future<Void> poolCloseAsyncIfSupported(ChannelPool pool) {
if (pool instanceof SimpleChannelPool) {
((SimpleChannelPool) pool).closeAsync();
return ((SimpleChannelPool) pool).closeAsync();
} else {
try {
pool.close();
return GlobalEventExecutor.INSTANCE.newSucceededFuture(null);
} catch (Exception e) {
return GlobalEventExecutor.INSTANCE.newFailedFuture(e);
}
}
}
@ -108,7 +146,8 @@ public abstract class AbstractChannelPoolMap<K, P extends ChannelPool>
@Override
public final void close() {
for (K key: map.keySet()) {
remove(key);
// Wait for remove to finish to ensure that resources are released before returning from close
removeAsyncIfSupported(key).syncUninterruptibly();
}
}
}

View File

@ -22,9 +22,14 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import org.junit.Test;
import java.net.ConnectException;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@ -67,10 +72,90 @@ public class AbstractChannelPoolMapTest {
pool.acquire().syncUninterruptibly();
}
@Test
public void testRemoveClosesChannelPool() {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
final Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
AbstractChannelPoolMap<EventLoop, TestPool> poolMap =
new AbstractChannelPoolMap<EventLoop, TestPool>() {
@Override
protected TestPool newPool(EventLoop key) {
return new TestPool(cb.clone(key), new TestChannelPoolHandler());
}
};
EventLoop loop = group.next();
TestPool pool = poolMap.get(loop);
assertTrue(poolMap.remove(loop));
// the pool should be closed eventually after remove
pool.closeFuture.awaitUninterruptibly(1, TimeUnit.SECONDS);
assertTrue(pool.closeFuture.isDone());
}
@Test
public void testCloseClosesPoolsImmediately() {
EventLoopGroup group = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
final Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group)
.channel(LocalChannel.class);
AbstractChannelPoolMap<EventLoop, TestPool> poolMap =
new AbstractChannelPoolMap<EventLoop, TestPool>() {
@Override
protected TestPool newPool(EventLoop key) {
return new TestPool(cb.clone(key), new TestChannelPoolHandler());
}
};
EventLoop loop = group.next();
TestPool pool = poolMap.get(loop);
assertFalse(pool.closeFuture.isDone());
// the pool should be closed immediately after remove
poolMap.close();
assertTrue(pool.closeFuture.isDone());
}
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
@Override
public void channelCreated(Channel ch) throws Exception {
// NOOP
}
}
private static final class TestPool extends SimpleChannelPool {
private final Promise<Void> closeFuture;
TestPool(Bootstrap bootstrap, ChannelPoolHandler handler) {
super(bootstrap, handler);
EventExecutor executor = bootstrap.config().group().next();
closeFuture = executor.newPromise();
}
@Override
public Future<Void> closeAsync() {
Future<Void> poolClose = super.closeAsync();
poolClose.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
closeFuture.setSuccess(null);
} else {
closeFuture.setFailure(future.cause());
}
}
});
return poolClose;
}
}
}