Fix StackOverflowError raised by DefaultPromise.notifyListeners() when ImmediateEventExecutor was used

- Fixed #1602
This commit is contained in:
Trustin Lee 2013-07-18 14:03:48 +09:00
parent b130ee6a6c
commit 0f2542ded5
2 changed files with 115 additions and 28 deletions

View File

@ -535,38 +535,47 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final Integer stackDepth = LISTENER_STACK_DEPTH.get();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
LISTENER_STACK_DEPTH.set(stackDepth + 1);
try {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0(this, (DefaultFutureListeners) listeners);
} else {
@SuppressWarnings("unchecked")
final GenericFutureListener<? extends Future<V>> l =
(GenericFutureListener<? extends Future<V>>) listeners;
notifyListener0(this, l);
}
} finally {
LISTENER_STACK_DEPTH.set(stackDepth);
}
return;
}
}
try {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0(this, (DefaultFutureListeners) listeners);
final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
executor.execute(new Runnable() {
@Override
public void run() {
notifyListeners0(DefaultPromise.this, dfl);
}
});
} else {
@SuppressWarnings("unchecked")
final GenericFutureListener<? extends Future<V>> l =
(GenericFutureListener<? extends Future<V>>) listeners;
notifyListener0(this, l);
}
} else {
try {
if (listeners instanceof DefaultFutureListeners) {
final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
executor.execute(new Runnable() {
@Override
public void run() {
notifyListeners0(DefaultPromise.this, dfl);
}
});
} else {
@SuppressWarnings("unchecked")
final GenericFutureListener<? extends Future<V>> l =
(GenericFutureListener<? extends Future<V>>) listeners;
executor.execute(new Runnable() {
@Override
public void run() {
notifyListener0(DefaultPromise.this, l);
}
});
}
} catch (Throwable t) {
logger.error("Failed to notify listener(s). Event loop shut down?", t);
executor.execute(new Runnable() {
@Override
public void run() {
notifyListener0(DefaultPromise.this, l);
}
});
}
} catch (Throwable t) {
logger.error("Failed to notify listener(s). Event loop shut down?", t);
}
}
@ -598,7 +607,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
eventExecutor.execute(new Runnable() {
@Override
public void run() {
notifyListener(eventExecutor, future, l);
notifyListener0(future, l);
}
});
} catch (Throwable t) {
@ -607,7 +616,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {

View File

@ -0,0 +1,78 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
@SuppressWarnings("unchecked")
public class DefaultPromiseTest {
@Test
public void testNoStackOverflowErrorWithImmediateEventExecutorA() throws Exception {
final Promise<Void>[] p = new DefaultPromise[128];
for (int i = 0; i < p.length; i ++) {
final int finalI = i;
p[i] = new DefaultPromise<Void>(ImmediateEventExecutor.INSTANCE);
p[i].addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (finalI + 1 < p.length) {
p[finalI + 1].setSuccess(null);
}
}
});
}
p[0].setSuccess(null);
for (Promise<Void> a: p) {
assertThat(a.isSuccess(), is(true));
}
}
@Test
public void testNoStackOverflowErrorWithImmediateEventExecutorB() throws Exception {
final Promise<Void>[] p = new DefaultPromise[128];
for (int i = 0; i < p.length; i ++) {
final int finalI = i;
p[i] = new DefaultPromise<Void>(ImmediateEventExecutor.INSTANCE);
p[i].addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
DefaultPromise.notifyListener(ImmediateEventExecutor.INSTANCE, future, new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (finalI + 1 < p.length) {
p[finalI + 1].setSuccess(null);
}
}
});
}
});
}
p[0].setSuccess(null);
for (Promise<Void> a: p) {
assertThat(a.isSuccess(), is(true));
}
}
}