Remove code that accounts for changing EventExecutors in DefaultPromise (#8996)

Motivation:

DefaultPromise requires an EventExecutor which provides the thread to notify listeners on and this EventExecutor can never change. We can remove the code that supported the possibility of a changing the executor as this is not possible anymore.

Modifications:

- Remove constructor which allowed to construct a *Promise without an EventExecutor
- Remove extra state
- Adjusted SslHandler and ProxyHandler for new code

Result:

Fixes https://github.com/netty/netty/issues/8517.
This commit is contained in:
Norman Maurer 2019-04-03 10:36:55 +02:00 committed by GitHub
parent d0beb8dea1
commit bace8a1cce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 45 additions and 73 deletions

View File

@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.concurrent.ImmediateEventExecutor;
import junit.framework.AssertionFailedError; import junit.framework.AssertionFailedError;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -122,7 +123,7 @@ public class DefaultHttp2ConnectionDecoderTest {
public void setup() throws Exception { public void setup() throws Exception {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
promise = new DefaultChannelPromise(channel); promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
final AtomicInteger headersReceivedState = new AtomicInteger(); final AtomicInteger headersReceivedState = new AtomicInteger();
when(channel.isActive()).thenReturn(true); when(channel.isActive()).thenReturn(true);

View File

@ -599,7 +599,7 @@ public class Http2ConnectionHandlerTest {
verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID + 2), eq(errorCode), eq(data), verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID + 2), eq(errorCode), eq(data),
eq(promise)); eq(promise));
verify(connection).goAwaySent(eq(STREAM_ID + 2), eq(errorCode), eq(data)); verify(connection).goAwaySent(eq(STREAM_ID + 2), eq(errorCode), eq(data));
promise = new DefaultChannelPromise(channel); promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
handler.goAway(ctx, STREAM_ID, errorCode, data, promise); handler.goAway(ctx, STREAM_ID, errorCode, data, promise);
verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID), eq(errorCode), eq(data), eq(promise)); verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID), eq(errorCode), eq(data), eq(promise));
verify(connection).goAwaySent(eq(STREAM_ID), eq(errorCode), eq(data)); verify(connection).goAwaySent(eq(STREAM_ID), eq(errorCode), eq(data));

View File

@ -30,8 +30,6 @@ public class DefaultProgressivePromise<V> extends DefaultPromise<V> implements P
super(executor); super(executor);
} }
protected DefaultProgressivePromise() { /* only for subclasses */ }
@Override @Override
public ProgressivePromise<V> setProgress(long progress, long total) { public ProgressivePromise<V> setProgress(long progress, long total) {
if (total < 0) { if (total < 0) {

View File

@ -58,12 +58,6 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
*/ */
private short waiters; private short waiters;
/**
* Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
* executor changes.
*/
private boolean notifyingListeners;
/** /**
* Creates a new instance. * Creates a new instance.
* *
@ -80,14 +74,6 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
this.executor = requireNonNull(executor, "executor"); this.executor = requireNonNull(executor, "executor");
} }
/**
* See {@link #executor()} for expectations of the executor.
*/
protected DefaultPromise() {
// only for subclasses
executor = null;
}
@Override @Override
public Promise<V> setSuccess(V result) { public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) { if (setSuccess0(result)) {
@ -376,13 +362,16 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
* depth exceeds a threshold. * depth exceeds a threshold.
* @return The executor used to notify listeners when this promise is complete. * @return The executor used to notify listeners when this promise is complete.
*/ */
protected EventExecutor executor() { protected final EventExecutor executor() {
return executor; return executor;
} }
protected void checkDeadLock() { protected void checkDeadLock() {
EventExecutor e = executor(); checkDeadLock(executor);
if (e != null && e.inEventLoop()) { }
protected final void checkDeadLock(EventExecutor executor) {
if (executor.inEventLoop()) {
throw new BlockingOperationException(toString()); throw new BlockingOperationException(toString());
} }
} }
@ -451,11 +440,10 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private void notifyListenersNow() { private void notifyListenersNow() {
Object listeners; Object listeners;
synchronized (this) { synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners. // Only proceed if there are listeners to notify.
if (notifyingListeners || this.listeners == null) { if (this.listeners == null) {
return; return;
} }
notifyingListeners = true;
listeners = this.listeners; listeners = this.listeners;
this.listeners = null; this.listeners = null;
} }
@ -467,9 +455,6 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
} }
synchronized (this) { synchronized (this) {
if (this.listeners == null) { if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return; return;
} }
listeners = this.listeners; listeners = this.listeners;

View File

@ -26,8 +26,9 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue; import io.netty.channel.PendingWriteQueue;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -59,7 +60,7 @@ public abstract class ProxyHandler implements ChannelHandler {
private boolean finished; private boolean finished;
private boolean suppressChannelReadComplete; private boolean suppressChannelReadComplete;
private boolean flushedPrematurely; private boolean flushedPrematurely;
private final LazyChannelPromise connectPromise = new LazyChannelPromise(); private final Promise<Channel> connectPromise = new LazyPromise();
private ScheduledFuture<?> connectTimeoutFuture; private ScheduledFuture<?> connectTimeoutFuture;
private final ChannelFutureListener writeListener = future -> { private final ChannelFutureListener writeListener = future -> {
if (!future.isSuccess()) { if (!future.isSuccess()) {
@ -440,13 +441,19 @@ public abstract class ProxyHandler implements ChannelHandler {
pendingWrites.add(msg, promise); pendingWrites.add(msg, promise);
} }
private final class LazyChannelPromise extends DefaultPromise<Channel> { private final class LazyPromise extends DefaultPromise<Channel> {
LazyPromise() {
super(ImmediateEventExecutor.INSTANCE);
}
@Override @Override
protected EventExecutor executor() { protected void checkDeadLock() {
if (ctx == null) { if (ctx == null) {
throw new IllegalStateException(); // If ctx is null the handlerAdded(...) callback was not called yet.
return;
} }
return ctx.executor(); checkDeadLock(ctx.executor());
} }
} }
} }

View File

@ -40,6 +40,7 @@ import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.ImmediateExecutor; import io.netty.util.concurrent.ImmediateExecutor;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.concurrent.PromiseNotifier;
@ -392,8 +393,8 @@ public class SslHandler extends ByteToMessageDecoder {
private boolean handshakeStarted; private boolean handshakeStarted;
private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites; private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
private Promise<Channel> handshakePromise = new LazyChannelPromise(); private Promise<Channel> handshakePromise = new LazyPromise();
private final LazyChannelPromise sslClosePromise = new LazyChannelPromise(); private final Promise<Channel> sslClosePromise = new LazyPromise();
/** /**
* Set by wrap*() methods when something is produced. * Set by wrap*() methods when something is produced.
@ -2152,14 +2153,10 @@ public class SslHandler extends ByteToMessageDecoder {
return false; return false;
} }
private final class LazyChannelPromise extends DefaultPromise<Channel> { private final class LazyPromise extends DefaultPromise<Channel> {
@Override LazyPromise() {
protected EventExecutor executor() { super(ImmediateEventExecutor.INSTANCE);
if (ctx == null) {
throw new IllegalStateException();
}
return ctx.executor();
} }
@Override @Override
@ -2169,11 +2166,10 @@ public class SslHandler extends ByteToMessageDecoder {
// method was called from another Thread then the one that is used by ctx.executor(). We need to // method was called from another Thread then the one that is used by ctx.executor(). We need to
// guard against this as a user can see a race if handshakeFuture().sync() is called but the // guard against this as a user can see a race if handshakeFuture().sync() is called but the
// handlerAdded(..) method was not yet as it is called from the EventExecutor of the // handlerAdded(..) method was not yet as it is called from the EventExecutor of the
// ChannelHandlerContext. If we not guard against this super.checkDeadLock() would cause an // ChannelHandlerContext.
// IllegalStateException when trying to call executor().
return; return;
} }
super.checkDeadLock(); checkDeadLock(ctx.executor());
} }
} }
} }

View File

@ -22,6 +22,7 @@ import io.netty.channel.socket.ChannelOutputShutdownEvent;
import io.netty.channel.socket.ChannelOutputShutdownException; import io.netty.channel.socket.ChannelOutputShutdownException;
import io.netty.util.DefaultAttributeMap; import io.netty.util.DefaultAttributeMap;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
@ -63,7 +64,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final ChannelPipeline pipeline; private final ChannelPipeline pipeline;
private final ChannelFuture succeedFuture; private final ChannelFuture succeedFuture;
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false); private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this); private final CloseFuture closeFuture;
private volatile SocketAddress localAddress; private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress; private volatile SocketAddress remoteAddress;
@ -85,8 +86,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected AbstractChannel(Channel parent, EventLoop eventLoop) { protected AbstractChannel(Channel parent, EventLoop eventLoop) {
this.parent = parent; this.parent = parent;
this.eventLoop = validateEventLoop(eventLoop); this.eventLoop = validateEventLoop(eventLoop);
id = newId(); closeFuture = new CloseFuture(this, eventLoop);
succeedFuture = new SucceededChannelFuture(this, eventLoop); succeedFuture = new SucceededChannelFuture(this, eventLoop);
id = newId();
unsafe = newUnsafe(); unsafe = newUnsafe();
pipeline = newChannelPipeline(); pipeline = newChannelPipeline();
} }
@ -100,8 +102,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected AbstractChannel(Channel parent, EventLoop eventLoop, ChannelId id) { protected AbstractChannel(Channel parent, EventLoop eventLoop, ChannelId id) {
this.parent = parent; this.parent = parent;
this.eventLoop = validateEventLoop(eventLoop); this.eventLoop = validateEventLoop(eventLoop);
this.id = id; closeFuture = new CloseFuture(this, eventLoop);
succeedFuture = new SucceededChannelFuture(this, eventLoop); succeedFuture = new SucceededChannelFuture(this, eventLoop);
this.id = id;
unsafe = newUnsafe(); unsafe = newUnsafe();
pipeline = newChannelPipeline(); pipeline = newChannelPipeline();
} }
@ -1100,8 +1103,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
static final class CloseFuture extends DefaultChannelPromise { static final class CloseFuture extends DefaultChannelPromise {
CloseFuture(AbstractChannel ch) { CloseFuture(AbstractChannel ch, EventExecutor eventExecutor) {
super(ch); super(ch, eventExecutor);
} }
@Override @Override

View File

@ -21,6 +21,8 @@ import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
import java.util.Objects;
/** /**
* The default {@link ChannelProgressivePromise} implementation. It is recommended to use * The default {@link ChannelProgressivePromise} implementation. It is recommended to use
* {@link Channel#newProgressivePromise()} to create a new {@link ChannelProgressivePromise} rather than calling the * {@link Channel#newProgressivePromise()} to create a new {@link ChannelProgressivePromise} rather than calling the
@ -39,7 +41,7 @@ public class DefaultChannelProgressivePromise
* the {@link Channel} associated with this future * the {@link Channel} associated with this future
*/ */
public DefaultChannelProgressivePromise(Channel channel) { public DefaultChannelProgressivePromise(Channel channel) {
this.channel = channel; this(channel, channel.eventLoop());
} }
/** /**
@ -50,17 +52,7 @@ public class DefaultChannelProgressivePromise
*/ */
public DefaultChannelProgressivePromise(Channel channel, EventExecutor executor) { public DefaultChannelProgressivePromise(Channel channel, EventExecutor executor) {
super(executor); super(executor);
this.channel = channel; this.channel = Objects.requireNonNull(channel, "channel");
}
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
} }
@Override @Override

View File

@ -39,7 +39,7 @@ public class DefaultChannelPromise extends DefaultPromise<Void> implements Chann
* the {@link Channel} associated with this future * the {@link Channel} associated with this future
*/ */
public DefaultChannelPromise(Channel channel) { public DefaultChannelPromise(Channel channel) {
this.channel = requireNonNull(channel, "channel"); this(channel, channel.eventLoop());
} }
/** /**
@ -53,16 +53,6 @@ public class DefaultChannelPromise extends DefaultPromise<Void> implements Chann
this.channel = requireNonNull(channel, "channel"); this.channel = requireNonNull(channel, "channel");
} }
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
}
@Override @Override
public Channel channel() { public Channel channel() {
return channel; return channel;