Fixed|SimpleChannelPool.close() should only return after complete. (#7927)

Motivation:

We need to ensure we only return from close() after all work is done as otherwise we may close the EventExecutor before we dispatched everything.

Modifications:

Correctly wait on operations to complete before return.

Result:

Fixes https://github.com/netty/netty/issues/7901.
This commit is contained in:
Norman Maurer 2018-05-21 19:22:31 +02:00 committed by GitHub
parent 88f0586a7e
commit 583fc272f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 21 deletions

View File

@ -20,6 +20,7 @@ import io.netty.channel.Channel;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ThrowableUtil;
@ -437,27 +438,43 @@ public class FixedChannelPool extends SimpleChannelPool {
@Override @Override
public void close() { public void close() {
executor.execute(new Runnable() { if (executor.inEventLoop()) {
@Override close0();
public void run() { } else {
if (!closed) { executor.submit(new Runnable() {
closed = true; @Override
for (;;) { public void run() {
AcquireTask task = pendingAcquireQueue.poll(); close0();
if (task == null) { }
break; }).awaitUninterruptibly();
} }
ScheduledFuture<?> f = task.timeoutFuture; }
if (f != null) {
f.cancel(false); private void close0() {
} if (!closed) {
task.promise.setFailure(new ClosedChannelException()); closed = true;
} for (;;) {
acquiredChannelCount = 0; AcquireTask task = pendingAcquireQueue.poll();
pendingAcquireCount = 0; if (task == null) {
break;
}
ScheduledFuture<?> f = task.timeoutFuture;
if (f != null) {
f.cancel(false);
}
task.promise.setFailure(new ClosedChannelException());
}
acquiredChannelCount = 0;
pendingAcquireCount = 0;
// Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
// to ensure we will not block in a EventExecutor.
GlobalEventExecutor.INSTANCE.execute(new Runnable() {
@Override
public void run() {
FixedChannelPool.super.close(); FixedChannelPool.super.close();
} }
} });
}); }
} }
} }

View File

@ -394,7 +394,8 @@ public class SimpleChannelPool implements ChannelPool {
if (channel == null) { if (channel == null) {
break; break;
} }
channel.close(); // Just ignore any errors that are reported back from close().
channel.close().awaitUninterruptibly();
} }
} }
} }