[#4373] Fix assert error when trying to release Channel to closed FixedChannelPool

Motivation:

Once a FixedChannelPool was closed we must not allow to acquire or release Channels to prevent assert errors.

Modifications:

Fail release and acquire calls when FixedChannelPool is closed.

Result:

No more assert errors.1
This commit is contained in:
Norman Maurer 2015-10-22 10:50:15 +02:00
parent 40e0fbfcb6
commit 082e9cc722
2 changed files with 66 additions and 12 deletions

View File

@ -69,6 +69,7 @@ public final class FixedChannelPool extends SimpleChannelPool {
private final int maxPendingAcquires; private final int maxPendingAcquires;
private int acquiredChannelCount; private int acquiredChannelCount;
private int pendingAcquireCount; private int pendingAcquireCount;
private boolean closed;
/** /**
* Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}. * Creates a new instance using the {@link ChannelHealthChecker#ACTIVE}.
@ -217,6 +218,10 @@ public final class FixedChannelPool extends SimpleChannelPool {
private void acquire0(final Promise<Channel> promise) { private void acquire0(final Promise<Channel> promise) {
assert executor.inEventLoop(); assert executor.inEventLoop();
if (closed) {
promise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
return;
}
if (acquiredChannelCount < maxConnections) { if (acquiredChannelCount < maxConnections) {
assert acquiredChannelCount >= 0; assert acquiredChannelCount >= 0;
@ -256,6 +261,11 @@ public final class FixedChannelPool extends SimpleChannelPool {
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
assert executor.inEventLoop(); assert executor.inEventLoop();
if (closed) {
promise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
return;
}
if (future.isSuccess()) { if (future.isSuccess()) {
decrementAndRunTaskQueue(); decrementAndRunTaskQueue();
promise.setSuccess(null); promise.setSuccess(null);
@ -359,6 +369,11 @@ public final class FixedChannelPool extends SimpleChannelPool {
public void operationComplete(Future<Channel> future) throws Exception { public void operationComplete(Future<Channel> future) throws Exception {
assert executor.inEventLoop(); assert executor.inEventLoop();
if (closed) {
originalPromise.setFailure(new IllegalStateException("FixedChannelPooled was closed"));
return;
}
if (future.isSuccess()) { if (future.isSuccess()) {
originalPromise.setSuccess(future.getNow()); originalPromise.setSuccess(future.getNow());
} else { } else {
@ -386,20 +401,23 @@ public final class FixedChannelPool extends SimpleChannelPool {
executor.execute(new OneTimeTask() { executor.execute(new OneTimeTask() {
@Override @Override
public void run() { public void run() {
for (;;) { if (!closed) {
AcquireTask task = pendingAcquireQueue.poll(); closed = true;
if (task == null) { for (;;) {
break; AcquireTask task = pendingAcquireQueue.poll();
if (task == null) {
break;
}
ScheduledFuture<?> f = task.timeoutFuture;
if (f != null) {
f.cancel(false);
}
task.promise.setFailure(new ClosedChannelException());
} }
ScheduledFuture<?> f = task.timeoutFuture; acquiredChannelCount = 0;
if (f != null) { pendingAcquireCount = 0;
f.cancel(false); FixedChannelPool.super.close();
}
task.promise.setFailure(new ClosedChannelException());
} }
acquiredChannelCount = 0;
pendingAcquireCount = 0;
FixedChannelPool.super.close();
} }
}); });
} }

View File

@ -263,6 +263,42 @@ public class FixedChannelPoolTest {
} }
} }
@Test
public void testReleaseAfterClosePool() throws Exception {
EventLoopGroup group = new LocalEventLoopGroup(1);
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
cb.remoteAddress(addr);
cb.group(group).channel(LocalChannel.class);
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
// Start server
Channel sc = sb.bind(addr).syncUninterruptibly().channel();
FixedChannelPool pool = new FixedChannelPool(cb, new TestChannelPoolHandler(), 2);
final Future<Channel> acquire = pool.acquire();
final Channel channel = acquire.get();
pool.close();
group.submit(new Runnable() {
@Override
public void run() {
// NOOP
}
}).syncUninterruptibly();
pool.release(channel).syncUninterruptibly();
sc.close().syncUninterruptibly();
channel.close().syncUninterruptibly();
}
private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler { private static final class TestChannelPoolHandler extends AbstractChannelPoolHandler {
@Override @Override
public void channelCreated(Channel ch) throws Exception { public void channelCreated(Channel ch) throws Exception {