diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java index 7f4df07b15..54d0bdb593 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java @@ -21,7 +21,6 @@ import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import java.nio.channels.ClosedChannelException; import java.util.concurrent.TimeUnit; @@ -119,7 +118,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder timeoutTask = ctx.executor().schedule(() -> { + Future timeoutTask = ctx.executor().schedule(() -> { if (!closeSent.isDone()) { closeSent.tryFailure(buildHandshakeException("send close frame timed out")); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 72f38331af..0411a8e24c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -881,7 +881,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http private static final class ClosingChannelFutureListener implements FutureListener { private final ChannelHandlerContext ctx; private final Promise promise; - private final ScheduledFuture timeoutTask; + private final Future timeoutTask; private boolean closed; ClosingChannelFutureListener(ChannelHandlerContext ctx, Promise promise) { diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java index 7f953fc91c..22e3a8004a 100644 --- a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java @@ -26,7 +26,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -60,7 +59,7 @@ public abstract class ProxyHandler implements ChannelHandler { private boolean suppressChannelReadComplete; private boolean flushedPrematurely; private final Promise connectPromise = new LazyPromise(); - private ScheduledFuture connectTimeoutFuture; + private Future connectTimeoutFuture; private final FutureListener writeListener = future -> { if (future.isFailed()) { setConnectFailure(future.cause()); diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 485ea169c5..4f7034d3e3 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -59,7 +59,6 @@ import java.nio.channels.DatagramChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -2022,7 +2021,7 @@ public class SslHandler extends ByteToMessageDecoder { return; } - final ScheduledFuture timeoutFuture = ctx.executor().schedule(() -> { + Future timeoutFuture = ctx.executor().schedule(() -> { if (localHandshakePromise.isDone()) { return; } @@ -2066,7 +2065,7 @@ public class SslHandler extends ByteToMessageDecoder { return; } - final ScheduledFuture timeoutFuture; + Future timeoutFuture; if (!flushFuture.isDone()) { long closeNotifyTimeout = closeNotifyFlushTimeoutMillis; if (closeNotifyTimeout > 0) { @@ -2101,7 +2100,7 @@ public class SslHandler extends ByteToMessageDecoder { promise.trySuccess(null); } } else { - final ScheduledFuture closeNotifyReadTimeoutFuture; + Future closeNotifyReadTimeoutFuture; if (!sslClosePromise.isDone()) { closeNotifyReadTimeoutFuture = ctx.executor().schedule(() -> { diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index d7fa4e887a..258c5525bf 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static java.util.Objects.requireNonNull; @@ -110,15 +109,15 @@ public class IdleStateHandler implements ChannelHandler { private final long writerIdleTimeNanos; private final long allIdleTimeNanos; - private ScheduledFuture readerIdleTimeout; + private Future readerIdleTimeout; private long lastReadTime; private boolean firstReaderIdleEvent = true; - private ScheduledFuture writerIdleTimeout; + private Future writerIdleTimeout; private long lastWriteTime; private boolean firstWriterIdleEvent = true; - private ScheduledFuture allIdleTimeout; + private Future allIdleTimeout; private boolean firstAllIdleEvent = true; private byte state; // 0 - none, 1 - initialized, 2 - destroyed @@ -341,7 +340,7 @@ public class IdleStateHandler implements ChannelHandler { /** * This method is visible for testing! */ - ScheduledFuture schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { + Future schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { return ctx.executor().schedule(task, delay, unit); } diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index a18fd2251d..e5131bbc92 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -23,7 +23,6 @@ import io.netty.channel.ChannelInitializer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static java.util.Objects.requireNonNull; @@ -191,7 +190,7 @@ public class WriteTimeoutHandler implements ChannelHandler { WriteTimeoutTask prev; WriteTimeoutTask next; - ScheduledFuture scheduledFuture; + Future scheduledFuture; WriteTimeoutTask(ChannelHandlerContext ctx, Future future) { this.ctx = ctx; this.future = future; diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java index 217d8c3474..283e01166e 100644 --- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java @@ -15,9 +15,6 @@ */ package io.netty.handler.traffic; -import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -26,6 +23,9 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; +import static java.util.Objects.requireNonNull; + /** * Counts the number of read and written bytes for rate-limiting traffic. diff --git a/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java b/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java index 7da9468f8e..1e770abaea 100644 --- a/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/timeout/IdleStateHandlerTest.java @@ -21,11 +21,11 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; @@ -363,7 +363,7 @@ public class IdleStateHandlerTest { } @Override - ScheduledFuture schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { + Future schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { this.task = task; this.delayInNanos = unit.toNanos(delay); return null; diff --git a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java index 4df4f6fb8d..36f3ee9e0d 100644 --- a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java @@ -30,7 +30,6 @@ import io.netty.channel.epoll.EpollSocketChannel; import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.GroupThreads; import org.openjdk.jmh.annotations.Setup; @@ -45,7 +44,7 @@ public class EpollSocketChannelBenchmark extends AbstractMicrobenchmark { private Channel serverChan; private Channel chan; private ByteBuf abyte; - private ScheduledFuture future; + private Future future; @Setup public void setup() throws Exception { diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java index 73b0be8324..8035bfc663 100644 --- a/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java @@ -20,7 +20,6 @@ import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.local.LocalHandler; import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ScheduledFuture; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Param; @@ -47,7 +46,7 @@ public class RunnableScheduledFutureAdapterBenchmark extends AbstractMicrobenchm @Param({ "100", "1000", "10000", "100000" }) int num; - final List> futures = new ArrayList<>(); + final List> futures = new ArrayList<>(); @Setup(Level.Invocation) public void reset() { diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java b/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java index cd0b0fddef..263d7ef4aa 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java @@ -16,6 +16,12 @@ package io.netty.resolver.dns; import io.netty.channel.EventLoop; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureCompletionStage; +import io.netty.util.concurrent.FutureContextListener; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; import java.util.ArrayList; import java.util.Collections; @@ -23,13 +29,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Delayed; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Function; import static java.util.Collections.singletonList; @@ -39,10 +47,84 @@ import static java.util.Collections.singletonList; * @param */ abstract class Cache { - private static final AtomicReferenceFieldUpdater FUTURE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(Cache.Entries.class, ScheduledFuture.class, "expirationFuture"); + private static final AtomicReferenceFieldUpdater FUTURE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Cache.Entries.class, FutureAndDelay.class, "expirationFuture"); - private static final ScheduledFuture CANCELLED = new ScheduledFuture() { + private static final Future CANCELLED_FUTURE = new Future() { + @Override + public boolean isSuccess() { + return false; + } + + @Override + public boolean isFailed() { + return true; + } + + @Override + public boolean isCancellable() { + return false; + } + + @Override + public Throwable cause() { + return new CancellationException(); + } + + @Override + public Future addListener(FutureListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public Future addListener(C context, FutureContextListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public Future sync() throws InterruptedException { + return this; + } + + @Override + public Future syncUninterruptibly() { + return this; + } + + @Override + public Future await() throws InterruptedException { + return this; + } + + @Override + public Future awaitUninterruptibly() { + return this; + } + + @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return true; + } + + @Override + public boolean await(long timeoutMillis) throws InterruptedException { + return true; + } + + @Override + public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { + return true; + } + + @Override + public boolean awaitUninterruptibly(long timeoutMillis) { + return true; + } + + @Override + public Object getNow() { + return null; + } @Override public boolean cancel(boolean mayInterruptIfRunning) { @@ -50,14 +132,7 @@ abstract class Cache { } @Override - public long getDelay(TimeUnit unit) { - // We ignore unit and always return the minimum value to ensure the TTL of the cancelled marker is - // the smallest. - return Long.MIN_VALUE; - } - - @Override - public int compareTo(Delayed o) { + public EventExecutor executor() { throw new UnsupportedOperationException(); } @@ -80,7 +155,28 @@ abstract class Cache { public Object get(long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } + + @Override + public FutureCompletionStage asStage() { + throw new UnsupportedOperationException(); + } + + @Override + public Future map(Function mapper) { + throw new UnsupportedOperationException(); + } + + @Override + public Future flatMap(Function> mapper) { + throw new UnsupportedOperationException(); + } + + @Override + public Future cascadeTo(Promise promise) { + throw new UnsupportedOperationException(); + } }; + private static final FutureAndDelay CANCELLED = new FutureAndDelay(CANCELLED_FUTURE, Integer.MIN_VALUE); // Two years are supported by all our EventLoop implementations and so safe to use as maximum. // See also: https://github.com/netty/netty/commit/b47fb817991b42ec8808c7d26538f3f2464e1fa6 @@ -163,7 +259,7 @@ abstract class Cache { private final String hostname; // Needs to be package-private to be able to access it via the AtomicReferenceFieldUpdater - volatile ScheduledFuture expirationFuture; + volatile FutureAndDelay expirationFuture; Entries(String hostname) { super(Collections.emptyList()); @@ -235,18 +331,18 @@ abstract class Cache { // We currently don't calculate a new TTL when we need to retry the CAS as we don't expect this to // be invoked very concurrently and also we use SECONDS anyway. If this ever becomes a problem // we can reconsider. - ScheduledFuture oldFuture = FUTURE_UPDATER.get(this); + FutureAndDelay oldFuture = FUTURE_UPDATER.get(this); if (oldFuture == null || oldFuture.getDelay(TimeUnit.SECONDS) > ttl) { - ScheduledFuture newFuture = loop.schedule(this, ttl, TimeUnit.SECONDS); + Future newFuture = loop.schedule(this, ttl, TimeUnit.SECONDS); // It is possible that // 1. task will fire in between this line, or // 2. multiple timers may be set if there is concurrency // (1) Shouldn't be a problem because we will fail the CAS and then the next loop will see CANCELLED // so the ttl will not be less, and we will bail out of the loop. // (2) This is a trade-off to avoid concurrency resulting in contention on a synchronized block. - if (FUTURE_UPDATER.compareAndSet(this, oldFuture, newFuture)) { + if (FUTURE_UPDATER.compareAndSet(this, oldFuture, new FutureAndDelay(newFuture, ttl))) { if (oldFuture != null) { - oldFuture.cancel(true); + oldFuture.cancel(); } break; } else { @@ -265,9 +361,9 @@ abstract class Cache { return false; } - ScheduledFuture expirationFuture = FUTURE_UPDATER.getAndSet(this, CANCELLED); + FutureAndDelay expirationFuture = FUTURE_UPDATER.getAndSet(this, CANCELLED); if (expirationFuture != null) { - expirationFuture.cancel(false); + expirationFuture.cancel(); } return true; @@ -289,4 +385,40 @@ abstract class Cache { clearAndCancel(); } } + + private static final class FutureAndDelay implements Delayed { + final Future future; + final long deadlineNanos; + + private FutureAndDelay(Future future, int ttl) { + this.future = Objects.requireNonNull(future, "future"); + deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(ttl); + } + + void cancel() { + future.cancel(false); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(deadlineNanos - System.nanoTime(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed other) { + return Long.compare(deadlineNanos, other.getDelay(TimeUnit.NANOSECONDS)); + } + + @Override + public boolean equals(Object o) { + return o instanceof FutureAndDelay && compareTo((FutureAndDelay) o) == 0; + } + + @Override + public int hashCode() { + int result = future.hashCode(); + result = 31 * result + (int) (deadlineNanos ^ deadlineNanos >>> 32); + return result; + } + } } diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java index d95fce064b..314fbdf4ea 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java @@ -26,7 +26,6 @@ import io.netty.handler.codec.dns.DnsSection; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -48,7 +47,7 @@ abstract class DnsQueryContext implements FutureListener timeoutFuture; + private volatile Future timeoutFuture; DnsQueryContext(DnsNameResolver parent, InetSocketAddress nameServerAddr, @@ -146,7 +145,7 @@ abstract class DnsQueryContext implements FutureListener writeFuture, Promise writePromise) { + private void onQueryWriteCompletion(Future writeFuture, Promise writePromise) { if (writeFuture.isFailed()) { writePromise.setFailure(writeFuture.cause()); tryFailure("failed to send a query via " + protocol(), writeFuture.cause(), false); @@ -215,7 +214,7 @@ abstract class DnsQueryContext implements FutureListener> future) { // Cancel the timeout task. - final ScheduledFuture timeoutFuture = this.timeoutFuture; + final Future timeoutFuture = this.timeoutFuture; if (timeoutFuture != null) { this.timeoutFuture = null; timeoutFuture.cancel(false); diff --git a/transport-blockhound-tests/src/test/java/io/netty/util/internal/NettyBlockHoundIntegrationTest.java b/transport-blockhound-tests/src/test/java/io/netty/util/internal/NettyBlockHoundIntegrationTest.java index faa5354fa6..a8398757c0 100644 --- a/transport-blockhound-tests/src/test/java/io/netty/util/internal/NettyBlockHoundIntegrationTest.java +++ b/transport-blockhound-tests/src/test/java/io/netty/util/internal/NettyBlockHoundIntegrationTest.java @@ -45,7 +45,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateExecutor; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.internal.Hidden.NettyBlockHoundIntegration; import org.hamcrest.Matchers; @@ -141,7 +140,7 @@ public class NettyBlockHoundIntegrationTest { private static void testEventExecutorTakeTask(EventExecutor eventExecutor) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - ScheduledFuture f = eventExecutor.schedule(latch::countDown, 10, TimeUnit.MILLISECONDS); + Future f = eventExecutor.schedule(latch::countDown, 10, TimeUnit.MILLISECONDS); f.sync(); latch.await(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 75c24d70a0..ecf70734a3 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -36,6 +36,7 @@ import io.netty.channel.unix.IovArray; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import java.io.IOException; @@ -47,7 +48,6 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.NotYetConnectedException; import java.nio.channels.UnresolvedAddressException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL; @@ -62,7 +62,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann * connection attempts will fail. */ private Promise connectPromise; - private ScheduledFuture connectTimeoutFuture; + private Future connectTimeoutFuture; private SocketAddress requestedRemoteAddress; protected EpollRegistration registration; @@ -164,7 +164,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann connectPromise = null; } - ScheduledFuture future = connectTimeoutFuture; + Future future = connectTimeoutFuture; if (future != null) { future.cancel(false); connectTimeoutFuture = null; diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 5e03eb90de..1db93a573e 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -34,6 +34,7 @@ import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import java.io.IOException; @@ -45,7 +46,6 @@ import java.nio.channels.AlreadyConnectedException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.NotYetConnectedException; import java.nio.channels.UnresolvedAddressException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL; @@ -59,7 +59,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan * connection attempts will fail. */ private Promise connectPromise; - private ScheduledFuture connectTimeoutFuture; + private Future connectTimeoutFuture; private SocketAddress requestedRemoteAddress; private KQueueRegistration registration; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 69223eba3f..da1655374b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -26,6 +26,7 @@ import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -36,7 +37,6 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -58,7 +58,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { * connection attempts will fail. */ private Promise connectPromise; - private ScheduledFuture connectTimeoutFuture; + private Future connectTimeoutFuture; private SocketAddress requestedRemoteAddress; /** @@ -460,7 +460,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { connectPromise = null; } - ScheduledFuture future = connectTimeoutFuture; + Future future = connectTimeoutFuture; if (future != null) { future.cancel(false); connectTimeoutFuture = null; diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index 934ab5a29c..369c4b7892 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -15,7 +15,21 @@ */ package io.netty.channel.embedded; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundInvoker; +import io.netty.channel.ChannelPipeline; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Queue; @@ -25,22 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelId; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -124,7 +122,7 @@ public class EmbeddedChannelTest { public void testScheduling() throws Exception { EmbeddedChannel ch = new EmbeddedChannel(new ChannelHandler() { }); final CountDownLatch latch = new CountDownLatch(2); - ScheduledFuture future = ch.executor().schedule(latch::countDown, 1, TimeUnit.SECONDS); + Future future = ch.executor().schedule(latch::countDown, 1, TimeUnit.SECONDS); future.addListener(future1 -> latch.countDown()); long next = ch.runScheduledPendingTasks(); assertTrue(next > 0); @@ -138,7 +136,7 @@ public class EmbeddedChannelTest { @Test public void testScheduledCancelled() throws Exception { EmbeddedChannel ch = new EmbeddedChannel(new ChannelHandler() { }); - ScheduledFuture future = ch.executor().schedule(() -> { }, 1, TimeUnit.DAYS); + Future future = ch.executor().schedule(() -> { }, 1, TimeUnit.DAYS); ch.finish(); assertTrue(future.isCancelled()); }