[#4373] Fix assert error when trying to release Channel to closed FixedChannelPool

Motivation:

Once a FixedChannelPool was closed we must not allow to acquire or release Channels to prevent assert errors.

Modifications:

Fail release and acquire calls when FixedChannelPool is closed.

Result:

No more assert errors.1
This commit is contained in:
Norman Maurer 2015-10-22 10:50:15 +02:00
parent 2be4bb74a5
commit 8687475eb7
2 changed files with 66 additions and 12 deletions

View File

@ -69,6 +69,7 @@ public final class FixedChannelPool extends SimpleChannelPool {
private final int maxPendingAcquires;
private int acquiredChannelCount;
private int pendingAcquireCount;
private boolean closed;
/**
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
@ -217,6 +218,10 @@ public final class FixedChannelPool extends SimpleChannelPool {
private void acquire0(final Promise<Channel> promise) {
assert executor.inEventLoop();
if (closed) {
promise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
return;
}
if (acquiredChannelCount < maxConnections) {
assert acquiredChannelCount >= 0;
@ -256,6 +261,11 @@ public final class FixedChannelPool extends SimpleChannelPool {
public void operationComplete(Future<Void> future) throws Exception {
assert executor.inEventLoop();
if (closed) {
promise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
return;
}
if (future.isSuccess()) {
decrementAndRunTaskQueue();
promise.setSuccess(null);
@ -359,6 +369,11 @@ public final class FixedChannelPool extends SimpleChannelPool {
public void operationComplete(Future<Channel> future) throws Exception {
assert executor.inEventLoop();
if (closed) {
originalPromise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
return;
}
if (future.isSuccess()) {
originalPromise.setSuccess(future.getNow());
} else {
@ -386,20 +401,23 @@ public final class FixedChannelPool extends SimpleChannelPool {
executor.execute(new OneTimeTask() {
@Override
public void run() {
for (;;) {
AcquireTask task = pendingAcquireQueue.poll();
if (task == null) {
break;
if (!closed) {
closed = true;
for (;;) {
AcquireTask task = pendingAcquireQueue.poll();
if (task == null) {
break;
}
ScheduledFuture<?> f = task.timeoutFuture;
if (f != null) {
f.cancel(false);
}
task.promise.setFailure(new ClosedChannelException());
}
ScheduledFuture<?> f = task.timeoutFuture;
if (f != null) {
f.cancel(false);
}
task.promise.setFailure(new ClosedChannelException());
acquiredChannelCount = 0;
pendingAcquireCount = 0;
FixedChannelPool.super.close();
}
acquiredChannelCount = 0;
pendingAcquireCount = 0;
FixedChannelPool.super.close();
}
});
}

View File

@ -264,6 +264,42 @@ public class FixedChannelPoolTest {
}
}
@Test
public void testReleaseAfterClosePool() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup(1);
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group).channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2);
final Future<Channel> acquire = pool.acquire();
final Channel channel = acquire.get();
pool.close();
group.submit(new Runnable() {
@Override
public void run() {
// NOOP
}
}).syncUninterruptibly();
pool.release(channel).syncUninterruptibly();
sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly();
}
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
@Override
public void channelCreated(Channel ch) throws Exception {