Fix possible IllegalStateException caused by closeNotifyTimeout when using SslHandler

Motivation:

In the SslHandler we schedule a timeout at which we close the Channel if a timeout was detected during close_notify. Because this can race with notify the flushFuture we can see an IllegalStateException when the Channel is closed.

Modifications:

- Use a trySuccess() and tryFailure(...) to guard against race.

Result:

No more race.
This commit is contained in:
Norman Maurer 2015-05-06 18:04:44 +02:00
parent 0615e538c1
commit 71f2e23633
4 changed files with 32 additions and 15 deletions

View File

@ -15,15 +15,21 @@
*/ */
package io.netty.util.concurrent; package io.netty.util.concurrent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/** /**
* {@link GenericFutureListener} implementation which takes other {@link Future}s * {@link GenericFutureListener} implementation which takes other {@link Future}s
* and notifies them on completion. * and notifies them on completion.
* *
* @param V the type of value returned by the future * @param <V> the type of value returned by the future
* @param F the type of future * @param <F> the type of future
*/ */
public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> { public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class);
private final Promise<? super V>[] promises; private final Promise<? super V>[] promises;
/** /**
@ -33,9 +39,7 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
*/ */
@SafeVarargs @SafeVarargs
public PromiseNotifier(Promise<? super V>... promises) { public PromiseNotifier(Promise<? super V>... promises) {
if (promises == null) { checkNotNull(promises, "promises");
throw new NullPointerException("promises");
}
for (Promise<? super V> promise: promises) { for (Promise<? super V> promise: promises) {
if (promise == null) { if (promise == null) {
throw new IllegalArgumentException("promises contains null Promise"); throw new IllegalArgumentException("promises contains null Promise");
@ -49,15 +53,18 @@ public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureLis
if (future.isSuccess()) { if (future.isSuccess()) {
V result = future.get(); V result = future.get();
for (Promise<? super V> p: promises) { for (Promise<? super V> p: promises) {
p.setSuccess(result); if (!p.trySuccess(result)) {
logger.warn("Failed to mark a promise as success because it is done already: {}", p);
}
} }
return; return;
} }
Throwable cause = future.cause(); Throwable cause = future.cause();
for (Promise<? super V> p: promises) { for (Promise<? super V> p: promises) {
p.setFailure(cause); if (!p.tryFailure(cause)) {
logger.warn("Failed to mark a promise as failure because it's done already: {}", p, cause);
}
} }
} }
} }

View File

@ -55,8 +55,8 @@ public class PromiseNotifierTest {
Future<Void> future = createStrictMock(Future.class); Future<Void> future = createStrictMock(Future.class);
expect(future.isSuccess()).andReturn(true); expect(future.isSuccess()).andReturn(true);
expect(future.get()).andReturn(null); expect(future.get()).andReturn(null);
expect(p1.setSuccess(null)).andReturn(p1); expect(p1.trySuccess(null)).andReturn(true);
expect(p2.setSuccess(null)).andReturn(p2); expect(p2.trySuccess(null)).andReturn(true);
replay(p1, p2, future); replay(p1, p2, future);
notifier.operationComplete(future); notifier.operationComplete(future);
@ -79,8 +79,8 @@ public class PromiseNotifierTest {
Throwable t = createStrictMock(Throwable.class); Throwable t = createStrictMock(Throwable.class);
expect(future.isSuccess()).andReturn(false); expect(future.isSuccess()).andReturn(false);
expect(future.cause()).andReturn(t); expect(future.cause()).andReturn(t);
expect(p1.setFailure(t)).andReturn(p1); expect(p1.tryFailure(t)).andReturn(true);
expect(p2.setFailure(t)).andReturn(p2); expect(p2.tryFailure(t)).andReturn(true);
replay(p1, p2, future); replay(p1, p2, future);
notifier.operationComplete(future); notifier.operationComplete(future);

View File

@ -29,6 +29,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.channel.PendingWriteQueue; import io.netty.channel.PendingWriteQueue;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.DefaultPromise;
@ -1367,7 +1368,12 @@ public class SslHandler extends ByteToMessageDecoder {
@Override @Override
public void run() { public void run() {
logger.warn("{} Last write attempt timed out; force-closing the connection.", ctx.channel()); logger.warn("{} Last write attempt timed out; force-closing the connection.", ctx.channel());
ctx.close(promise);
// We notify the promise in the TryNotifyListener as there is a "race" where the close(...) call
// by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
// this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
// IllegalStateException.
ctx.close(ctx.newPromise()).addListener(new ChannelPromiseNotifier(promise));
} }
}, closeNotifyTimeoutMillis, TimeUnit.MILLISECONDS); }, closeNotifyTimeoutMillis, TimeUnit.MILLISECONDS);
} else { } else {
@ -1384,7 +1390,12 @@ public class SslHandler extends ByteToMessageDecoder {
} }
// Trigger the close in all cases to make sure the promise is notified // Trigger the close in all cases to make sure the promise is notified
// See https://github.com/netty/netty/issues/2358 // See https://github.com/netty/netty/issues/2358
ctx.close(promise); //
// We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call
// by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
// this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
// IllegalStateException.
ctx.close(ctx.newPromise()).addListener(new ChannelPromiseNotifier(promise));
} }
}); });
} }

View File

@ -32,5 +32,4 @@ public final class ChannelPromiseNotifier
public ChannelPromiseNotifier(ChannelPromise... promises) { public ChannelPromiseNotifier(ChannelPromise... promises) {
super(promises); super(promises);
} }
} }