Make sync close for FixedChannelPool truly synchronous (#9226)

Motivation:

In the current implementation, the synchronous close() method for FixedChannelPool returns
after scheduling the channels to close via a single threaded executor asynchronously. Closing a channel
requires event loop group, however, there might be a scenario when the application has closed
the event loop group after the sync close() completes. In this scenario an exception is thrown
(event loop rejected the execution) when the single threaded executor tries to close the channel.

Modifications:

Complete the close function only after all the channels have been close and introduce
closeAsync() method for cases when the current/existing behaviour is desired.

Result:

Close function would completely when the channels have been closed
This commit is contained in:
Divij Vaidya 2019-06-14 03:01:14 -07:00 committed by Norman Maurer
parent dc2649e95d
commit fa1dedcc0f
2 changed files with 80 additions and 12 deletions

View File

@ -28,6 +28,7 @@ import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -441,19 +442,46 @@ public class FixedChannelPool extends SimpleChannelPool {
@Override @Override
public void close() { public void close() {
if (executor.inEventLoop()) { try {
close0(); closeAsync().await();
} else { } catch (InterruptedException e) {
executor.submit(new Runnable() { Thread.currentThread().interrupt();
@Override throw new RuntimeException(e);
public void run() {
close0();
}
}).awaitUninterruptibly();
} }
} }
private void close0() { /**
* Closes the pool in an async manner.
*
* @return Future which represents completion of the close task
*/
public Future<Void> closeAsync() {
if (executor.inEventLoop()) {
return close0();
} else {
final Promise<Void> closeComplete = executor.newPromise();
executor.execute(new Runnable() {
@Override
public void run() {
close0().addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> f) throws Exception {
if (f.isSuccess()) {
closeComplete.setSuccess(null);
} else {
closeComplete.setFailure(f.cause());
}
}
});
}
});
return closeComplete;
}
}
private Future<Void> close0() {
assert executor.inEventLoop();
if (!closed) { if (!closed) {
closed = true; closed = true;
for (;;) { for (;;) {
@ -472,12 +500,15 @@ public class FixedChannelPool extends SimpleChannelPool {
// Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
// to ensure we will not block in a EventExecutor. // to ensure we will not block in a EventExecutor.
GlobalEventExecutor.INSTANCE.execute(new Runnable() { return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
@Override @Override
public void run() { public Void call() throws Exception {
FixedChannelPool.super.close(); FixedChannelPool.super.close();
return null;
} }
}); });
} }
return GlobalEventExecutor.INSTANCE.newSucceededFuture(null);
} }
} }

View File

@ -27,10 +27,13 @@ import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel; import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction; import io.netty.channel.pool.FixedChannelPool.AcquireTimeoutAction;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -346,6 +349,40 @@ public class FixedChannelPoolTest {
sc.close().syncUninterruptibly(); sc.close().syncUninterruptibly();
} }
@Test
public void testCloseAsync() throws ExecutionException, InterruptedException {
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group).channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
final Channel sc = sb.bind(addr).syncUninterruptibly().channel();
final FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2);
pool.acquire().get();
pool.acquire().get();
pool.closeAsync().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
Assert.assertEquals(0, pool.acquiredChannelCount());
sc.close().syncUninterruptibly();
}
}).awaitUninterruptibly();
}
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
@Override @Override
public void channelCreated(Channel ch) throws Exception { public void channelCreated(Channel ch) throws Exception {