From 782d70281ef63817c1d5a263e03a988b85663e7f Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 31 Aug 2021 16:06:34 +0200 Subject: [PATCH] Reduce reliance on ScheduledFuture (#11635) Motivation: If we don't need the scheduled future, then it will be one less complication when we change Netty Future to no longer extend JDK Future. It would also result in fewer elements in our API. Modification: There was only one real use of ScheduledFuture in our code, in Cache. This has been changed to wrap an ordinary future with a deadline for implementing the Delayed interface. All other places were effectively overspecifying by relying on ScheduledFuture. A few places were also specifying JDK Futures - these have been changed to specify Netty Futures. Result: Reduced dependency on the ScheduledFuture interfaces. --- .../websocketx/WebSocketProtocolHandler.java | 3 +- .../codec/http2/Http2ConnectionHandler.java | 2 +- .../io/netty/handler/proxy/ProxyHandler.java | 3 +- .../java/io/netty/handler/ssl/SslHandler.java | 7 +- .../handler/timeout/IdleStateHandler.java | 9 +- .../handler/timeout/WriteTimeoutHandler.java | 3 +- .../netty/handler/traffic/TrafficCounter.java | 6 +- .../handler/timeout/IdleStateHandlerTest.java | 4 +- .../epoll/EpollSocketChannelBenchmark.java | 3 +- ...nnableScheduledFutureAdapterBenchmark.java | 3 +- .../java/io/netty/resolver/dns/Cache.java | 170 ++++++++++++++++-- .../netty/resolver/dns/DnsQueryContext.java | 7 +- .../NettyBlockHoundIntegrationTest.java | 3 +- .../channel/epoll/AbstractEpollChannel.java | 6 +- .../channel/kqueue/AbstractKQueueChannel.java | 4 +- .../netty/channel/nio/AbstractNioChannel.java | 6 +- .../channel/embedded/EmbeddedChannelTest.java | 34 ++-- 17 files changed, 197 insertions(+), 76 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java index 7f4df07b15..54d0bdb593 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java @@ -21,7 +21,6 @@ import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import java.nio.channels.ClosedChannelException; import java.util.concurrent.TimeUnit; @@ -119,7 +118,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder timeoutTask = ctx.executor().schedule(() -> { + Future timeoutTask = ctx.executor().schedule(() -> { if (!closeSent.isDone()) { closeSent.tryFailure(buildHandshakeException("send close frame timed out")); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 72f38331af..0411a8e24c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -881,7 +881,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http private static final class ClosingChannelFutureListener implements FutureListener { private final ChannelHandlerContext ctx; private final Promise promise; - private final ScheduledFuture timeoutTask; + private final Future timeoutTask; private boolean closed; ClosingChannelFutureListener(ChannelHandlerContext ctx, Promise promise) { diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java index 7f953fc91c..22e3a8004a 100644 --- a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java @@ -26,7 +26,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -60,7 +59,7 @@ public abstract class ProxyHandler implements ChannelHandler { private boolean suppressChannelReadComplete; private boolean flushedPrematurely; private final Promise connectPromise = new LazyPromise(); - private ScheduledFuture connectTimeoutFuture; + private Future connectTimeoutFuture; private final FutureListener writeListener = future -> { if (future.isFailed()) { setConnectFailure(future.cause()); diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 485ea169c5..4f7034d3e3 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -59,7 +59,6 @@ import java.nio.channels.DatagramChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -2022,7 +2021,7 @@ public class SslHandler extends ByteToMessageDecoder { return; } - final ScheduledFuture timeoutFuture = ctx.executor().schedule(() -> { + Future timeoutFuture = ctx.executor().schedule(() -> { if (localHandshakePromise.isDone()) { return; } @@ -2066,7 +2065,7 @@ public class SslHandler extends ByteToMessageDecoder { return; } - final ScheduledFuture timeoutFuture; + Future timeoutFuture; if (!flushFuture.isDone()) { long closeNotifyTimeout = closeNotifyFlushTimeoutMillis; if (closeNotifyTimeout > 0) { @@ -2101,7 +2100,7 @@ public class SslHandler extends ByteToMessageDecoder { promise.trySuccess(null); } } else { - final ScheduledFuture closeNotifyReadTimeoutFuture; + Future closeNotifyReadTimeoutFuture; if (!sslClosePromise.isDone()) { closeNotifyReadTimeoutFuture = ctx.executor().schedule(() -> { diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index d7fa4e887a..258c5525bf 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static java.util.Objects.requireNonNull; @@ -110,15 +109,15 @@ public class IdleStateHandler implements ChannelHandler { private final long writerIdleTimeNanos; private final long allIdleTimeNanos; - private ScheduledFuture readerIdleTimeout; + private Future readerIdleTimeout; private long lastReadTime; private boolean firstReaderIdleEvent = true; - private ScheduledFuture writerIdleTimeout; + private Future writerIdleTimeout; private long lastWriteTime; private boolean firstWriterIdleEvent = true; - private ScheduledFuture allIdleTimeout; + private Future allIdleTimeout; private boolean firstAllIdleEvent = true; private byte state; // 0 - none, 1 - initialized, 2 - destroyed @@ -341,7 +340,7 @@ public class IdleStateHandler implements ChannelHandler { /** * This method is visible for testing! */ - ScheduledFuture schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { + Future schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { return ctx.executor().schedule(task, delay, unit); } diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index a18fd2251d..e5131bbc92 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -23,7 +23,6 @@ import io.netty.channel.ChannelInitializer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static java.util.Objects.requireNonNull; @@ -191,7 +190,7 @@ public class WriteTimeoutHandler implements ChannelHandler { WriteTimeoutTask prev; WriteTimeoutTask next; - ScheduledFuture scheduledFuture; + Future scheduledFuture; WriteTimeoutTask(ChannelHandlerContext ctx, Future future) { this.ctx = ctx; this.future = future; diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java index 217d8c3474..283e01166e 100644 --- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java @@ -15,9 +15,6 @@ */ package io.netty.handler.traffic; -import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -26,6 +23,9 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; +import static java.util.Objects.requireNonNull; + /** * Counts the number of read and written bytes for rate-limiting traffic. diff --git a/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java b/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java index 7da9468f8e..1e770abaea 100644 --- a/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java @@ -21,11 +21,11 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; @@ -363,7 +363,7 @@ public class IdleStateHandlerTest { } @Override - ScheduledFuture schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { + Future schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { this.task = task; this.delayInNanos = unit.toNanos(delay); return null; diff --git a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java index 4df4f6fb8d..36f3ee9e0d 100644 --- a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java @@ -30,7 +30,6 @@ import io.netty.channel.epoll.EpollSocketChannel; import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.GroupThreads; import org.openjdk.jmh.annotations.Setup; @@ -45,7 +44,7 @@ public class EpollSocketChannelBenchmark extends AbstractMicrobenchmark { private Channel serverChan; private Channel chan; private ByteBuf abyte; - private ScheduledFuture future; + private Future future; @Setup public void setup() throws Exception { diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java index 73b0be8324..8035bfc663 100644 --- a/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java @@ -20,7 +20,6 @@ import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.local.LocalHandler; import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ScheduledFuture; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Param; @@ -47,7 +46,7 @@ public class RunnableScheduledFutureAdapterBenchmark extends AbstractMicrobenchm @Param({ "100", "1000", "10000", "100000" }) int num; - final List> futures = new ArrayList<>(); + final List> futures = new ArrayList<>(); @Setup(Level.Invocation) public void reset() { diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java b/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java index cd0b0fddef..263d7ef4aa 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java @@ -16,6 +16,12 @@ package io.netty.resolver.dns; import io.netty.channel.EventLoop; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureCompletionStage; +import io.netty.util.concurrent.FutureContextListener; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; import java.util.ArrayList; import java.util.Collections; @@ -23,13 +29,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Delayed; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Function; import static java.util.Collections.singletonList; @@ -39,10 +47,84 @@ import static java.util.Collections.singletonList; * @param */ abstract class Cache { - private static final AtomicReferenceFieldUpdater FUTURE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(Cache.Entries.class, ScheduledFuture.class, "expirationFuture"); + private static final AtomicReferenceFieldUpdater FUTURE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Cache.Entries.class, FutureAndDelay.class, "expirationFuture"); - private static final ScheduledFuture CANCELLED = new ScheduledFuture() { + private static final Future CANCELLED_FUTURE = new Future() { + @Override + public boolean isSuccess() { + return false; + } + + @Override + public boolean isFailed() { + return true; + } + + @Override + public boolean isCancellable() { + return false; + } + + @Override + public Throwable cause() { + return new CancellationException(); + } + + @Override + public Future addListener(FutureListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public Future addListener(C context, FutureContextListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public Future sync() throws InterruptedException { + return this; + } + + @Override + public Future syncUninterruptibly() { + return this; + } + + @Override + public Future await() throws InterruptedException { + return this; + } + + @Override + public Future awaitUninterruptibly() { + return this; + } + + @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return true; + } + + @Override + public boolean await(long timeoutMillis) throws InterruptedException { + return true; + } + + @Override + public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { + return true; + } + + @Override + public boolean awaitUninterruptibly(long timeoutMillis) { + return true; + } + + @Override + public Object getNow() { + return null; + } @Override public boolean cancel(boolean mayInterruptIfRunning) { @@ -50,14 +132,7 @@ abstract class Cache { } @Override - public long getDelay(TimeUnit unit) { - // We ignore unit and always return the minimum value to ensure the TTL of the cancelled marker is - // the smallest. - return Long.MIN_VALUE; - } - - @Override - public int compareTo(Delayed o) { + public EventExecutor executor() { throw new UnsupportedOperationException(); } @@ -80,7 +155,28 @@ abstract class Cache { public Object get(long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } + + @Override + public FutureCompletionStage asStage() { + throw new UnsupportedOperationException(); + } + + @Override + public Future map(Function mapper) { + throw new UnsupportedOperationException(); + } + + @Override + public Future flatMap(Function> mapper) { + throw new UnsupportedOperationException(); + } + + @Override + public Future cascadeTo(Promise promise) { + throw new UnsupportedOperationException(); + } }; + private static final FutureAndDelay CANCELLED = new FutureAndDelay(CANCELLED_FUTURE, Integer.MIN_VALUE); // Two years are supported by all our EventLoop implementations and so safe to use as maximum. // See also: https://github.com/netty/netty/commit/b47fb817991b42ec8808c7d26538f3f2464e1fa6 @@ -163,7 +259,7 @@ abstract class Cache { private final String hostname; // Needs to be package-private to be able to access it via the AtomicReferenceFieldUpdater - volatile ScheduledFuture expirationFuture; + volatile FutureAndDelay expirationFuture; Entries(String hostname) { super(Collections.emptyList()); @@ -235,18 +331,18 @@ abstract class Cache { // We currently don't calculate a new TTL when we need to retry the CAS as we don't expect this to // be invoked very concurrently and also we use SECONDS anyway. If this ever becomes a problem // we can reconsider. - ScheduledFuture oldFuture = FUTURE_UPDATER.get(this); + FutureAndDelay oldFuture = FUTURE_UPDATER.get(this); if (oldFuture == null || oldFuture.getDelay(TimeUnit.SECONDS) > ttl) { - ScheduledFuture newFuture = loop.schedule(this, ttl, TimeUnit.SECONDS); + Future newFuture = loop.schedule(this, ttl, TimeUnit.SECONDS); // It is possible that // 1. task will fire in between this line, or // 2. multiple timers may be set if there is concurrency // (1) Shouldn't be a problem because we will fail the CAS and then the next loop will see CANCELLED // so the ttl will not be less, and we will bail out of the loop. // (2) This is a trade-off to avoid concurrency resulting in contention on a synchronized block. - if (FUTURE_UPDATER.compareAndSet(this, oldFuture, newFuture)) { + if (FUTURE_UPDATER.compareAndSet(this, oldFuture, new FutureAndDelay(newFuture, ttl))) { if (oldFuture != null) { - oldFuture.cancel(true); + oldFuture.cancel(); } break; } else { @@ -265,9 +361,9 @@ abstract class Cache { return false; } - ScheduledFuture expirationFuture = FUTURE_UPDATER.getAndSet(this, CANCELLED); + FutureAndDelay expirationFuture = FUTURE_UPDATER.getAndSet(this, CANCELLED); if (expirationFuture != null) { - expirationFuture.cancel(false); + expirationFuture.cancel(); } return true; @@ -289,4 +385,40 @@ abstract class Cache { clearAndCancel(); } } + + private static final class FutureAndDelay implements Delayed { + final Future future; + final long deadlineNanos; + + private FutureAndDelay(Future future, int ttl) { + this.future = Objects.requireNonNull(future, "future"); + deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(ttl); + } + + void cancel() { + future.cancel(false); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(deadlineNanos - System.nanoTime(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed other) { + return Long.compare(deadlineNanos, other.getDelay(TimeUnit.NANOSECONDS)); + } + + @Override + public boolean equals(Object o) { + return o instanceof FutureAndDelay && compareTo((FutureAndDelay) o) == 0; + } + + @Override + public int hashCode() { + int result = future.hashCode(); + result = 31 * result + (int) (deadlineNanos ^ deadlineNanos >>> 32); + return result; + } + } } diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java index d95fce064b..314fbdf4ea 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java @@ -26,7 +26,6 @@ import io.netty.handler.codec.dns.DnsSection; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -48,7 +47,7 @@ abstract class DnsQueryContext implements FutureListener timeoutFuture; + private volatile Future timeoutFuture; DnsQueryContext(DnsNameResolver parent, InetSocketAddress nameServerAddr, @@ -146,7 +145,7 @@ abstract class DnsQueryContext implements FutureListener writeFuture, Promise writePromise) { + private void onQueryWriteCompletion(Future writeFuture, Promise writePromise) { if (writeFuture.isFailed()) { writePromise.setFailure(writeFuture.cause()); tryFailure("failed to send a query via " + protocol(), writeFuture.cause(), false); @@ -215,7 +214,7 @@ abstract class DnsQueryContext implements FutureListener> future) { // Cancel the timeout task. - final ScheduledFuture timeoutFuture = this.timeoutFuture; + final Future timeoutFuture = this.timeoutFuture; if (timeoutFuture != null) { this.timeoutFuture = null; timeoutFuture.cancel(false); diff --git a/transport-blockhound-tests/src/test/java/io/netty/util/internal/NettyBlockHoundIntegrationTest.java b/transport-blockhound-tests/src/test/java/io/netty/util/internal/NettyBlockHoundIntegrationTest.java index faa5354fa6..a8398757c0 100644 --- a/transport-blockhound-tests/src/test/java/io/netty/util/internal/NettyBlockHoundIntegrationTest.java +++ b/transport-blockhound-tests/src/test/java/io/netty/util/internal/NettyBlockHoundIntegrationTest.java @@ -45,7 +45,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateExecutor; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.internal.Hidden.NettyBlockHoundIntegration; import org.hamcrest.Matchers; @@ -141,7 +140,7 @@ public class NettyBlockHoundIntegrationTest { private static void testEventExecutorTakeTask(EventExecutor eventExecutor) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - ScheduledFuture f = eventExecutor.schedule(latch::countDown, 10, TimeUnit.MILLISECONDS); + Future f = eventExecutor.schedule(latch::countDown, 10, TimeUnit.MILLISECONDS); f.sync(); latch.await(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 75c24d70a0..ecf70734a3 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -36,6 +36,7 @@ import io.netty.channel.unix.IovArray; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import java.io.IOException; @@ -47,7 +48,6 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.NotYetConnectedException; import java.nio.channels.UnresolvedAddressException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL; @@ -62,7 +62,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann * connection attempts will fail. */ private Promise connectPromise; - private ScheduledFuture connectTimeoutFuture; + private Future connectTimeoutFuture; private SocketAddress requestedRemoteAddress; protected EpollRegistration registration; @@ -164,7 +164,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann connectPromise = null; } - ScheduledFuture future = connectTimeoutFuture; + Future future = connectTimeoutFuture; if (future != null) { future.cancel(false); connectTimeoutFuture = null; diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 5e03eb90de..1db93a573e 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -34,6 +34,7 @@ import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import java.io.IOException; @@ -45,7 +46,6 @@ import java.nio.channels.AlreadyConnectedException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.NotYetConnectedException; import java.nio.channels.UnresolvedAddressException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL; @@ -59,7 +59,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan * connection attempts will fail. */ private Promise connectPromise; - private ScheduledFuture connectTimeoutFuture; + private Future connectTimeoutFuture; private SocketAddress requestedRemoteAddress; private KQueueRegistration registration; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 69223eba3f..da1655374b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -26,6 +26,7 @@ import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -36,7 +37,6 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -58,7 +58,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { * connection attempts will fail. */ private Promise connectPromise; - private ScheduledFuture connectTimeoutFuture; + private Future connectTimeoutFuture; private SocketAddress requestedRemoteAddress; /** @@ -460,7 +460,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { connectPromise = null; } - ScheduledFuture future = connectTimeoutFuture; + Future future = connectTimeoutFuture; if (future != null) { future.cancel(false); connectTimeoutFuture = null; diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index 934ab5a29c..369c4b7892 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -15,7 +15,21 @@ */ package io.netty.channel.embedded; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundInvoker; +import io.netty.channel.ChannelPipeline; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Queue; @@ -25,22 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelId; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -124,7 +122,7 @@ public class EmbeddedChannelTest { public void testScheduling() throws Exception { EmbeddedChannel ch = new EmbeddedChannel(new ChannelHandler() { }); final CountDownLatch latch = new CountDownLatch(2); - ScheduledFuture future = ch.executor().schedule(latch::countDown, 1, TimeUnit.SECONDS); + Future future = ch.executor().schedule(latch::countDown, 1, TimeUnit.SECONDS); future.addListener(future1 -> latch.countDown()); long next = ch.runScheduledPendingTasks(); assertTrue(next > 0); @@ -138,7 +136,7 @@ public class EmbeddedChannelTest { @Test public void testScheduledCancelled() throws Exception { EmbeddedChannel ch = new EmbeddedChannel(new ChannelHandler() { }); - ScheduledFuture future = ch.executor().schedule(() -> { }, 1, TimeUnit.DAYS); + Future future = ch.executor().schedule(() -> { }, 1, TimeUnit.DAYS); ch.finish(); assertTrue(future.isCancelled()); }