Change visibility of DefaultPromise to push users to use EventExecutor methods (#11646)

Motivation:

While we use DefaultPromise as our implementation of Promise and Future users should not really use it directly. Users should always use the EventExecutor / EventLoop to create a Promise / Future.

Modifications:

- Change static Promise methods to be package-private
- Add default implementations for Promise and Future creation to EventExecutor
- Change public constructor to protected
- Remove usage of DefaultPromise in our tests

Result:

Less likely users will depend on the actual Promise implementation
This commit is contained in:
Norman Maurer 2021-09-02 19:11:01 +02:00 committed by GitHub
parent 0cb4cc4e49
commit cba3b4dd57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 28 additions and 54 deletions

View File

@ -20,7 +20,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
@ -129,7 +128,7 @@ public class DefaultHttp2ConnectionDecoderTest {
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
Promise<Void> promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE);
Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise();
final AtomicInteger headersReceivedState = new AtomicInteger();
when(channel.isActive()).thenReturn(true);

View File

@ -26,7 +26,6 @@ import io.netty.channel.DefaultChannelConfig;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.Http2Exception.ShutdownHint;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -54,7 +53,6 @@ import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.util.CharsetUtil.US_ASCII;
import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.concurrent.DefaultPromise.newSuccessfulPromise;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -141,7 +139,7 @@ public class Http2ConnectionHandlerTest {
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE);
promise = ImmediateEventExecutor.INSTANCE.newPromise();
when(channel.metadata()).thenReturn(new ChannelMetadata(false));
DefaultChannelConfig config = new DefaultChannelConfig(channel);
@ -193,8 +191,8 @@ public class Http2ConnectionHandlerTest {
when(ctx.channel()).thenReturn(channel);
when(ctx.newFailedFuture(any(Throwable.class)))
.thenAnswer(invocationOnMock ->
DefaultPromise.newFailedPromise(executor, invocationOnMock.getArgument(0)));
when(ctx.newSucceededFuture()).thenReturn(newSuccessfulPromise(executor, (Void) null).asFuture());
ImmediateEventExecutor.INSTANCE.newFailedFuture(invocationOnMock.getArgument(0)));
when(ctx.newSucceededFuture()).thenReturn(ImmediateEventExecutor.INSTANCE.newSucceededFuture(null));
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
when(ctx.executor()).thenReturn(executor);
@ -204,9 +202,9 @@ public class Http2ConnectionHandlerTest {
return null;
}).when(ctx).fireChannelRead(any());
doAnswer((Answer<Future<Void>>) in ->
newSuccessfulPromise(executor, (Void) null).asFuture()).when(ctx).write(any());
ImmediateEventExecutor.INSTANCE.newSucceededFuture(null)).when(ctx).write(any());
doAnswer((Answer<Future<Void>>) in ->
newSuccessfulPromise(executor, (Void) null).asFuture()).when(ctx).close();
ImmediateEventExecutor.INSTANCE.newSucceededFuture(null)).when(ctx).close();
}
private Http2ConnectionHandler newHandler() throws Exception {
@ -608,7 +606,7 @@ public class Http2ConnectionHandlerTest {
handler.goAway(ctx, STREAM_ID + 2, errorCode, data.retain());
verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID + 2), eq(errorCode), eq(data));
verify(connection).goAwaySent(eq(STREAM_ID + 2), eq(errorCode), eq(data));
promise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE);
promise = ImmediateEventExecutor.INSTANCE.newPromise();
handler.goAway(ctx, STREAM_ID, errorCode, data);
verify(frameWriter).writeGoAway(eq(ctx), eq(STREAM_ID), eq(errorCode), eq(data));
verify(connection).goAwaySent(eq(STREAM_ID), eq(errorCode), eq(data));

View File

@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.DefaultMessageSizeEstimator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
@ -264,6 +263,6 @@ public class Http2ControlFrameLimitEncoderTest {
}
private static Promise<Void> newPromise() {
return new DefaultPromise<>(ImmediateEventExecutor.INSTANCE);
return ImmediateEventExecutor.INSTANCE.newPromise();
}
}

View File

@ -35,7 +35,6 @@ import io.netty.util.AbstractReferenceCounted;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
@ -613,7 +612,7 @@ public class Http2FrameCodecTest {
assertNotNull(stream);
assertFalse(isStreamIdValid(stream.id()));
final Promise<Void> listenerExecuted = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
final Promise<Void> listenerExecuted = GlobalEventExecutor.INSTANCE.newPromise();
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), false).stream(stream))
.addListener(future -> {

View File

@ -23,10 +23,10 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AsciiString;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -99,11 +99,11 @@ public class Http2FrameRoundtripTest {
when(ctx.executor()).thenReturn(executor);
when(ctx.channel()).thenReturn(channel);
doAnswer((Answer<Future<Void>>) in ->
DefaultPromise.newSuccessfulPromise(executor, (Void) null).asFuture()).when(ctx).write(any());
ImmediateEventExecutor.INSTANCE.newSucceededFuture(null)).when(ctx).write(any());
doAnswer((Answer<ByteBuf>) in -> Unpooled.buffer()).when(alloc).buffer();
doAnswer((Answer<ByteBuf>) in -> Unpooled.buffer((Integer) in.getArguments()[0])).when(alloc).buffer(anyInt());
doAnswer((Answer<Promise<Void>>) invocation ->
new DefaultPromise<>(GlobalEventExecutor.INSTANCE)).when(ctx).newPromise();
GlobalEventExecutor.INSTANCE.newPromise()).when(ctx).newPromise();
writer = new DefaultHttp2FrameWriter(new DefaultHttp2HeadersEncoder(NEVER_SENSITIVE, newTestEncoder()));
reader = new DefaultHttp2FrameReader(new DefaultHttp2HeadersDecoder(false, newTestDecoder()));

View File

@ -27,8 +27,8 @@ import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalHandler;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -145,7 +145,7 @@ public class Http2StreamChannelBootstrapTest {
when(ctx.executor()).thenReturn(executor);
when(ctx.handler()).thenReturn(handler);
Promise<Http2StreamChannel> promise = new DefaultPromise<>(mock(EventExecutor.class));
Promise<Http2StreamChannel> promise = ImmediateEventExecutor.INSTANCE.newPromise();
bootstrap.open0(ctx, promise);
assertThat(promise.isDone(), is(true));
assertThat(promise.cause(), is(instanceOf(IllegalStateException.class)));

View File

@ -75,11 +75,6 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
return Collections.emptyList();
}
@Override
public <V> Promise<V> newPromise() {
return new DefaultPromise<>(this);
}
@Override
public <V> Future<V> newSucceededFuture(V result) {
if (result == null) {
@ -87,13 +82,7 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
Future<V> f = (Future<V>) successfulVoidFuture;
return f;
}
return DefaultPromise.newSuccessfulPromise(this, result).asFuture();
}
@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
Promise<V> promise = DefaultPromise.newFailedPromise(this, cause);
return promise.asFuture();
return EventExecutor.super.newSucceededFuture(result);
}
@Override

View File

@ -69,7 +69,7 @@ public class DefaultPromise<V> implements Promise<V>, Future<V> {
/**
* Creates a new unfulfilled promise.
*
* It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
* This constructor is only meant to be used by sub-classes.
*
* @param executor
* The {@link EventExecutor} which is used to notify the promise once it is complete.
@ -77,7 +77,7 @@ public class DefaultPromise<V> implements Promise<V>, Future<V> {
* The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
* depth exceeds a threshold.
*/
public DefaultPromise(EventExecutor executor) {
protected DefaultPromise(EventExecutor executor) {
this.executor = requireNonNull(executor, "executor");
stage = new DefaultFutureCompletionStage<>(this);
}
@ -92,7 +92,7 @@ public class DefaultPromise<V> implements Promise<V>, Future<V> {
* depth exceeds a threshold.
* @param result The result of the successful promise.
*/
public static <V> Promise<V> newSuccessfulPromise(EventExecutor executor, V result) {
static <V> Promise<V> newSuccessfulPromise(EventExecutor executor, V result) {
return new DefaultPromise<>(executor, result);
}
@ -106,7 +106,7 @@ public class DefaultPromise<V> implements Promise<V>, Future<V> {
* depth exceeds a threshold.
* @param cause The {@link Throwable} that caused the failure of the returned promise.
*/
public static <V> Promise<V> newFailedPromise(EventExecutor executor, Throwable cause) {
static <V> Promise<V> newFailedPromise(EventExecutor executor, Throwable cause) {
return new DefaultPromise<>(cause, executor);
}

View File

@ -44,19 +44,25 @@ public interface EventExecutor extends EventExecutorGroup {
/**
* Return a new {@link Promise}.
*/
<V> Promise<V> newPromise();
default <V> Promise<V> newPromise() {
return new DefaultPromise<>(this);
}
/**
* Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
<V> Future<V> newSucceededFuture(V result);
default <V> Future<V> newSucceededFuture(V result) {
return DefaultPromise.newSuccessfulPromise(this, result).asFuture();
}
/**
* Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
* will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
<V> Future<V> newFailedFuture(Throwable cause);
default <V> Future<V> newFailedFuture(Throwable cause) {
return DefaultPromise.<V>newFailedPromise(this, cause).asFuture();
}
}

View File

@ -92,22 +92,6 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
return false;
}
@Override
public <V> Promise<V> newPromise() {
return new DefaultPromise<>(this);
}
@Override
public <V> Future<V> newSucceededFuture(V result) {
return DefaultPromise.newSuccessfulPromise(this, result).asFuture();
}
@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
Promise<V> promise = DefaultPromise.newFailedPromise(this, cause);
return promise.asFuture();
}
@Override
public boolean isShuttingDown() {
return isShutdown();