Ensure the best effort is made even if future listeners could not be notified / Handle registration failure in a robust manner

- Related: #1187
This commit is contained in:
Trustin Lee 2013-03-21 17:48:10 +09:00
parent 9175abc451
commit a980638190
5 changed files with 91 additions and 54 deletions

View File

@ -481,6 +481,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
} else { } else {
final Object listeners = this.listeners; final Object listeners = this.listeners;
this.listeners = null; this.listeners = null;
try {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -491,6 +492,9 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
} }
} }
}); });
} catch (Throwable t) {
logger.error("Failed to notify listener(s). Event loop terminated?", t);
}
} }
} }
@ -505,8 +509,10 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static void notifyListener(final EventExecutor eventExecutor, final Future<?> future, protected static void notifyListener(
final EventExecutor eventExecutor, final Future<?> future,
final GenericFutureListener<? extends Future<?>> l) { final GenericFutureListener<? extends Future<?>> l) {
if (eventExecutor.inEventLoop()) { if (eventExecutor.inEventLoop()) {
final Integer stackDepth = LISTENER_STACK_DEPTH.get(); final Integer stackDepth = LISTENER_STACK_DEPTH.get();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) { if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
@ -520,12 +526,16 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
} }
} }
try {
eventExecutor.execute(new Runnable() { eventExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
notifyListener(eventExecutor, future, l); notifyListener(eventExecutor, future, l);
} }
}); });
} catch (Throwable t) {
logger.error("Failed to notify a listener. Event loop terminated?", t);
}
} }
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })

View File

@ -564,30 +564,32 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
throw new NullPointerException("eventLoop"); throw new NullPointerException("eventLoop");
} }
if (isRegistered()) { if (isRegistered()) {
throw new IllegalStateException("registered to an event loop already"); promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
} }
if (!isCompatible(eventLoop)) { if (!isCompatible(eventLoop)) {
throw new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()); promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
} }
AbstractChannel.this.eventLoop = eventLoop; AbstractChannel.this.eventLoop = eventLoop;
assert eventLoop().inEventLoop();
// check if the eventLoop which was given is currently in the eventloop.
// if that is the case we are safe to call register, if not we need to
// schedule the execution as otherwise we may say some race-conditions.
//
// See https://github.com/netty/netty/issues/654
if (eventLoop.inEventLoop()) { if (eventLoop.inEventLoop()) {
register0(promise); register0(promise);
} else { } else {
try {
eventLoop.execute(new Runnable() { eventLoop.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
register0(promise); register0(promise);
} }
}); });
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was unaccepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
promise.setFailure(t);
}
} }
} }

View File

@ -16,8 +16,8 @@
package io.netty.channel; package io.netty.channel;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.TaskScheduler;
import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.concurrent.TaskScheduler;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -48,31 +48,19 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
@Override @Override
public ChannelFuture register(Channel channel) { public ChannelFuture register(Channel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
return register(channel, channel.newPromise()); return register(channel, channel.newPromise());
} }
@Override @Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) { public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (isShutdown()) { if (channel == null) {
channel.unsafe().closeForcibly(); throw new NullPointerException("channel");
promise.setFailure(new EventLoopException("cannot register a channel to a shut down loop")); }
return promise; if (promise == null) {
throw new NullPointerException("promise");
} }
if (inEventLoop()) {
channel.unsafe().register(this, promise); channel.unsafe().register(this, promise);
} else {
execute(new Runnable() {
@Override
public void run() {
channel.unsafe().register(SingleThreadEventLoop.this, promise);
}
});
}
return promise; return promise;
} }
} }

View File

@ -194,9 +194,10 @@ public class LocalChannel extends AbstractChannel {
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
if (peer.isActive()) { LocalChannel peer = this.peer;
if (peer != null && peer.isActive()) {
peer.unsafe().close(peer.unsafe().voidFuture()); peer.unsafe().close(peer.unsafe().voidFuture());
peer = null; this.peer = null;
} }
} }

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.channel.local.LocalChannel;
import io.netty.util.concurrent.TaskScheduler; import io.netty.util.concurrent.TaskScheduler;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -24,11 +25,13 @@ import java.util.Queue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class SingleThreadEventLoopTest { public class SingleThreadEventLoopTest {
@ -244,6 +247,45 @@ public class SingleThreadEventLoopTest {
assertEquals(NUM_TASKS, ranTasks.get()); assertEquals(NUM_TASKS, ranTasks.get());
} }
@Test(timeout = 10000)
public void testRegistrationAfterTermination() throws Exception {
loop.shutdown();
while (!loop.isTerminated()) {
loop.awaitTermination(1, TimeUnit.DAYS);
}
ChannelFuture f = loop.register(new LocalChannel());
f.awaitUninterruptibly();
assertFalse(f.isSuccess());
assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class)));
}
@Test(timeout = 10000)
public void testRegistrationAfterTermination2() throws Exception {
loop.shutdown();
while (!loop.isTerminated()) {
loop.awaitTermination(1, TimeUnit.DAYS);
}
final CountDownLatch latch = new CountDownLatch(1);
Channel ch = new LocalChannel();
ChannelPromise promise = ch.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
latch.countDown();
}
});
ChannelFuture f = loop.register(ch, promise);
f.awaitUninterruptibly();
assertFalse(f.isSuccess());
assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class)));
// Ensure the listener was notified.
assertFalse(latch.await(1, TimeUnit.SECONDS));
}
private static class SingleThreadEventLoopImpl extends SingleThreadEventLoop { private static class SingleThreadEventLoopImpl extends SingleThreadEventLoop {
final AtomicInteger cleanedUp = new AtomicInteger(); final AtomicInteger cleanedUp = new AtomicInteger();
@ -274,11 +316,5 @@ public class SingleThreadEventLoopTest {
protected void cleanup() { protected void cleanup() {
cleanedUp.incrementAndGet(); cleanedUp.incrementAndGet();
} }
@Override
public ChannelFuture register(Channel channel, ChannelPromise future) {
// Untested
return future;
}
} }
} }