diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java index 7e6ba2038e..9a26f96d83 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java @@ -358,7 +358,7 @@ public abstract class WebSocketClientHandshaker { // Delay the removal of the decoder so the user can setup the pipeline if needed to handle // WebSocketFrame messages. // See https://github.com/netty/netty/issues/4533 - channel.eventLoop().execute(() -> p.remove(codec)); + channel.executor().execute(() -> p.remove(codec)); } else { if (p.get(HttpRequestEncoder.class) != null) { // Remove the encoder part of the codec as the user may start writing frames after this method returns. @@ -370,7 +370,7 @@ public abstract class WebSocketClientHandshaker { // Delay the removal of the decoder so the user can setup the pipeline if needed to handle // WebSocketFrame messages. // See https://github.com/netty/netty/issues/4533 - channel.eventLoop().execute(() -> p.remove(context.handler())); + channel.executor().execute(() -> p.remove(context.handler())); } } @@ -523,7 +523,7 @@ public abstract class WebSocketClientHandshaker { // Also, close might be called twice from different threads. if (future.isSuccess() && channel.isActive() && FORCE_CLOSE_INIT_UPDATER.compareAndSet(handshaker, 0, 1)) { - final Future forceCloseFuture = channel.eventLoop().schedule(() -> { + final Future forceCloseFuture = channel.executor().schedule(() -> { if (channel.isActive()) { channel.close(); forceCloseComplete = true; diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/HttpServerUpgradeHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/HttpServerUpgradeHandlerTest.java index e3182831db..6671fcf38b 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/HttpServerUpgradeHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/HttpServerUpgradeHandlerTest.java @@ -101,7 +101,7 @@ public class HttpServerUpgradeHandlerTest { assertTrue(inReadCall); writeUpgradeMessage = true; Promise promise = ctx.newPromise(); - ctx.channel().eventLoop().execute(() -> ctx.write(msg).addListener(new PromiseNotifier<>(promise))); + ctx.channel().executor().execute(() -> ctx.write(msg).addListener(new PromiseNotifier<>(promise))); promise.addListener(f -> writeFlushed = true); return promise; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java index 24e8a16e90..22e3602d33 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java @@ -256,7 +256,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements if (task == null) { fireChannelWritabilityChangedTask = task = pipeline::fireChannelWritabilityChanged; } - eventLoop().execute(task); + executor().execute(task); } else { pipeline.fireChannelWritabilityChanged(); } @@ -308,8 +308,8 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements } @Override - public EventLoop eventLoop() { - return parent().eventLoop(); + public EventLoop executor() { + return parent().executor(); } @Override @@ -400,7 +400,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements * channel. */ void fireChildRead(Http2Frame frame) { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); if (!isActive()) { ReferenceCountUtil.release(frame); } else if (readStatus != ReadStatus.IDLE) { @@ -427,7 +427,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements } void fireChildReadComplete() { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); assert readStatus != ReadStatus.IDLE || !readCompletePending; unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false); } @@ -615,7 +615,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements // -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet // // which means the execution of two inbound handler methods of the same handler overlap undesirably. - eventLoop().execute(task); + executor().execute(task); } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java index 866ffd3032..c9c82068a4 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -118,7 +118,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec { @Override public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception { - if (ctx.executor() != ctx.channel().eventLoop()) { + if (ctx.executor() != ctx.channel().executor()) { throw new IllegalStateException("EventExecutor must be EventLoop of Channel"); } this.ctx = ctx; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexHandler.java index 722a7f27c1..c2ff0e1111 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexHandler.java @@ -140,7 +140,7 @@ public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler { @Override protected void handlerAdded0(ChannelHandlerContext ctx) { - if (ctx.executor() != ctx.channel().eventLoop()) { + if (ctx.executor() != ctx.channel().executor()) { throw new IllegalStateException("EventExecutor must be EventLoop of Channel"); } this.ctx = ctx; diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamChannelBootstrap.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamChannelBootstrap.java index 07834f604a..a27b660960 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamChannelBootstrap.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2StreamChannelBootstrap.java @@ -102,7 +102,7 @@ public final class Http2StreamChannelBootstrap { * @return the {@link Future} that will be notified once the channel was opened successfully or it failed. */ public Future open() { - return open(channel.eventLoop().newPromise()); + return open(channel.executor().newPromise()); } /** diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodecTest.java index ca5360ed23..7ea7ed79e1 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodecTest.java @@ -71,7 +71,7 @@ public class Http2ClientUpgradeCodecTest { // Flush the channel to ensure we write out all buffered data channel.flush(); - channel.eventLoop().submit(() -> { + channel.executor().submit(() -> { codec.upgradeTo(ctx, null); return null; }).sync(); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java index 17b5c02452..f5b9f6e26d 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java @@ -224,7 +224,7 @@ public class Http2FrameCodecTest { EmbeddedChannel em = new EmbeddedChannel(codec); AtomicReference errorRef = new AtomicReference<>(); - em.eventLoop().execute(() -> { + em.executor().execute(() -> { try { // We call #consumeBytes on a stream id which has not been seen yet to emulate the case // where a stream is deregistered which in reality can happen in response to a RST. @@ -545,7 +545,7 @@ public class Http2FrameCodecTest { int connectionWindowSizeBefore = localFlow.windowSize(connectionStream); AtomicReference errorRef = new AtomicReference<>(); - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { try { // We only replenish the flow control window after the amount consumed drops below the following // threshold. We make the threshold very "high" so that window updates will be sent when the delta is @@ -728,7 +728,7 @@ public class Http2FrameCodecTest { public void streamIdentifiersExhausted() throws Exception { int maxServerStreamId = Integer.MAX_VALUE - 1; - channel.eventLoop().submit(() -> { + channel.executor().submit(() -> { assertNotNull(frameCodec.connection().local().createStream(maxServerStreamId, false)); return null; }).sync(); @@ -807,7 +807,7 @@ public class Http2FrameCodecTest { final Set activeStreams = new HashSet<>(); final AtomicReference errorRef = new AtomicReference<>(); - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { try { frameCodec.forEachActiveStream(stream -> { activeStreams.add(stream); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java index a289205295..4f9c0aa08d 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameInboundWriter.java @@ -119,7 +119,7 @@ final class Http2FrameInboundWriter { @Override public EventExecutor executor() { - return channel.eventLoop(); + return channel.executor(); } @Override diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexClientUpgradeTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexClientUpgradeTest.java index ecc8f0d1f7..8d13e6065f 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexClientUpgradeTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexClientUpgradeTest.java @@ -66,7 +66,7 @@ public abstract class Http2MultiplexClientUpgradeTest C codec = newCodec(upgradeHandler); EmbeddedChannel ch = new EmbeddedChannel(codec, newMultiplexer(upgradeHandler)); - ch.eventLoop().submit(() -> { + ch.executor().submit(() -> { codec.onHttpClientUpgrade(); return null; }).sync(); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java index 500f5460aa..8da8ca31b5 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java @@ -222,7 +222,7 @@ public class Http2MultiplexTransportTest { ctx.write(new DefaultHttp2DataFrame( Unpooled.copiedBuffer("Hello World", CharsetUtil.US_ASCII), true)); - ctx.channel().eventLoop().execute(ctx::flush); + ctx.channel().executor().execute(ctx::flush); }); }, 500, MILLISECONDS); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodecTest.java index f72ac4c2a2..ae2cd02682 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodecTest.java @@ -75,7 +75,7 @@ public class Http2ServerUpgradeCodecTest { } else { codec = new Http2ServerUpgradeCodec((Http2FrameCodec) handler, multiplexer); } - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { assertTrue(codec.prepareUpgradeResponse(ctx, request, new DefaultHttpHeaders())); codec.upgradeTo(ctx, request); }); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2StreamChannelBootstrapTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2StreamChannelBootstrapTest.java index 2c2f641eb5..0ba2ba6acb 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2StreamChannelBootstrapTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2StreamChannelBootstrapTest.java @@ -93,7 +93,7 @@ public class Http2StreamChannelBootstrapTest { assertTrue(serverChannelLatch.await(3, SECONDS)); Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(clientChannel); - final Promise promise = clientChannel.eventLoop().newPromise(); + final Promise promise = clientChannel.executor().newPromise(); clientChannel.close().sync(); bootstrap.open(promise); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java index bc8efce0b4..582a1d7b25 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2TestUtil.java @@ -20,7 +20,6 @@ import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; @@ -63,7 +62,7 @@ public final class Http2TestUtil { * Runs the given operation within the event loop thread of the given {@link Channel}. */ static void runInChannel(Channel channel, final Http2Runnable runnable) { - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { try { runnable.run(); } catch (Http2Exception e) { diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java index 10631da5fb..a51753a36b 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java @@ -272,7 +272,7 @@ public class Lz4FrameEncoderTest extends AbstractEncoderTest { clientChannel = bs.connect(serverChannel.localAddress()).get(); final Channel finalClientChannel = clientChannel; - clientChannel.eventLoop().execute(() -> { + clientChannel.executor().execute(() -> { finalClientChannel.close(); final int size = 27; ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(size, size); diff --git a/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java index ed70287d9a..a6a2920917 100644 --- a/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java +++ b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java @@ -44,7 +44,7 @@ public class HexDumpProxyFrontendHandler implements ChannelHandler { // Start the connection attempt. Bootstrap b = new Bootstrap(); - b.group(inboundChannel.eventLoop()) + b.group(inboundChannel.executor()) .channel(ctx.channel().getClass()) .handler(new ChannelInitializer() { @Override diff --git a/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java b/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java index 0860ead0ef..dd694a9a45 100644 --- a/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java +++ b/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java @@ -61,7 +61,7 @@ public final class SocksServerConnectHandler extends SimpleChannelInboundHandler }); final Channel inboundChannel = ctx.channel(); - b.group(inboundChannel.eventLoop()) + b.group(inboundChannel.executor()) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .option(ChannelOption.SO_KEEPALIVE, true) @@ -104,7 +104,7 @@ public final class SocksServerConnectHandler extends SimpleChannelInboundHandler }); final Channel inboundChannel = ctx.channel(); - b.group(inboundChannel.eventLoop()) + b.group(inboundChannel.executor()) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .option(ChannelOption.SO_KEEPALIVE, true) diff --git a/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java b/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java index a890562de2..6e3869b2fb 100644 --- a/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java +++ b/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java @@ -68,7 +68,7 @@ public class UptimeClientHandler extends SimpleChannelInboundHandler { public void channelUnregistered(final ChannelHandlerContext ctx) throws Exception { println("Sleeping for: " + UptimeClient.RECONNECT_DELAY + 's'); - ctx.channel().eventLoop().schedule(() -> { + ctx.channel().executor().schedule(() -> { println("Reconnecting to: " + UptimeClient.HOST + ':' + UptimeClient.PORT); UptimeClient.connect(); }, UptimeClient.RECONNECT_DELAY, TimeUnit.SECONDS); diff --git a/handler-proxy/src/test/java/io/netty/handler/proxy/ProxyServer.java b/handler-proxy/src/test/java/io/netty/handler/proxy/ProxyServer.java index 9ca32d06ed..1bed137652 100644 --- a/handler-proxy/src/test/java/io/netty/handler/proxy/ProxyServer.java +++ b/handler-proxy/src/test/java/io/netty/handler/proxy/ProxyServer.java @@ -167,7 +167,7 @@ abstract class ProxyServer { boolean finished = handleProxyProtocol(ctx, msg); if (finished) { this.finished = true; - Future f = connectToDestination(ctx.channel().eventLoop(), new BackendHandler(ctx)); + Future f = connectToDestination(ctx.channel().executor(), new BackendHandler(ctx)); f.addListener(future -> { if (!future.isSuccess()) { recordException(future.cause()); diff --git a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java index 6ad3027dfd..62bb9e7e69 100644 --- a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java +++ b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java @@ -199,7 +199,7 @@ public class FlushConsolidationHandler implements ChannelHandler { private void scheduleFlush(final ChannelHandlerContext ctx) { if (nextScheduledFlush == null) { // Run as soon as possible, but still yield to give a chance for additional writes to enqueue. - nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask); + nextScheduledFlush = ctx.channel().executor().submit(flushTask); } } diff --git a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java index 159ca27afd..da374847e5 100644 --- a/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flow/FlowControlHandlerTest.java @@ -505,7 +505,7 @@ public class FlowControlHandlerTest { if (num >= 3) { //We have received 3 messages. Remove myself later final ChannelHandler handler = this; - ctx.channel().eventLoop().execute(new Runnable() { + ctx.channel().executor().execute(new Runnable() { @Override public void run() { ctx.pipeline().remove(handler); diff --git a/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java b/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java index e9782ee803..788f0aaa23 100644 --- a/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java @@ -36,7 +36,7 @@ public class FlushConsolidationHandlerTest { public void testFlushViaScheduledTask() { final AtomicInteger flushCount = new AtomicInteger(); EmbeddedChannel channel = newChannel(flushCount, true); - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { // Flushes should not go through immediately, as they're scheduled as an async task channel.flush(); assertEquals(0, flushCount.get()); @@ -51,7 +51,7 @@ public class FlushConsolidationHandlerTest { public void testFlushViaThresholdOutsideOfReadLoop() { final AtomicInteger flushCount = new AtomicInteger(); EmbeddedChannel channel = newChannel(flushCount, true); - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { // After a given threshold, the async task should be bypassed and a flush should be triggered immediately for (int i = 0; i < EXPLICIT_FLUSH_AFTER_FLUSHES; i++) { channel.flush(); diff --git a/handler/src/test/java/io/netty/handler/ssl/SSLEngineTest.java b/handler/src/test/java/io/netty/handler/ssl/SSLEngineTest.java index a0a8a296d9..c2e3b43d3b 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SSLEngineTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SSLEngineTest.java @@ -1493,7 +1493,7 @@ public abstract class SSLEngineTest { // The server then attempts to trigger a flush operation once the application data is // received from the client. The flush will encrypt all data and should not result in // deadlock. - ctx.channel().eventLoop().schedule(() -> { + ctx.channel().executor().schedule(() -> { ctx.writeAndFlush(ctx.alloc().buffer(1).writeByte(101)); }, 500, TimeUnit.MILLISECONDS); } diff --git a/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java b/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java index 2825dc03c4..4ba6e14035 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java @@ -514,11 +514,11 @@ public class SslHandlerTest { sslHandler.setHandshakeTimeoutMillis(1000); ch.pipeline().addFirst(sslHandler); sslHandler.handshakeFuture().addListener(future -> { - ch.eventLoop().execute(() -> { + ch.executor().execute(() -> { ch.pipeline().remove(sslHandler); // Schedule the close so removal has time to propagate exception if any. - ch.eventLoop().execute(ch::close); + ch.executor().execute(ch::close); }); }); diff --git a/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java b/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java index e21c958547..22a57ddef1 100644 --- a/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java @@ -106,7 +106,7 @@ public class TrafficShapingHandlerTest { ch.writeAndFlush(Unpooled.wrappedBuffer("bar".getBytes(CharsetUtil.UTF_8))).await(); assertNotNull(attr.get()); final Channel clientChannel = ch; - ch.eventLoop().submit(() -> { + ch.executor().submit(() -> { clientChannel.pipeline().remove("traffic-shaping"); }).await(); //the attribute--reopen task must be released. diff --git a/microbench/src/main/java/io/netty/microbench/channel/EmbeddedChannelHandlerContext.java b/microbench/src/main/java/io/netty/microbench/channel/EmbeddedChannelHandlerContext.java index ad704cfffb..ad05e775dc 100644 --- a/microbench/src/main/java/io/netty/microbench/channel/EmbeddedChannelHandlerContext.java +++ b/microbench/src/main/java/io/netty/microbench/channel/EmbeddedChannelHandlerContext.java @@ -44,7 +44,7 @@ public abstract class EmbeddedChannelHandlerContext implements ChannelHandlerCon this.alloc = requireNonNull(alloc, "alloc"); this.channel = requireNonNull(channel, "channel"); this.handler = requireNonNull(handler, "handler"); - eventLoop = requireNonNull(channel.eventLoop(), "eventLoop"); + eventLoop = requireNonNull(channel.executor(), "eventLoop"); } protected abstract void handleException(Throwable t); diff --git a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java index 01917892da..4df4f6fb8d 100644 --- a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java @@ -139,12 +139,12 @@ public class EpollSocketChannelBenchmark extends AbstractMicrobenchmark { @Benchmark public Object executeSingle() throws Exception { - return chan.eventLoop().submit(runnable).get(); + return chan.executor().submit(runnable).get(); } @Benchmark @GroupThreads(3) public Object executeMulti() throws Exception { - return chan.eventLoop().submit(runnable).get(); + return chan.executor().submit(runnable).get(); } } diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsAddressResolveContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsAddressResolveContext.java index ebc24cfd47..e498dec7db 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsAddressResolveContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsAddressResolveContext.java @@ -79,12 +79,12 @@ final class DnsAddressResolveContext extends DnsResolveContext { @Override void cache(String hostname, DnsRecord[] additionals, DnsRecord result, InetAddress convertedResult) { - resolveCache.cache(hostname, additionals, convertedResult, result.timeToLive(), parent.ch.eventLoop()); + resolveCache.cache(hostname, additionals, convertedResult, result.timeToLive(), parent.ch.executor()); } @Override void cache(String hostname, DnsRecord[] additionals, UnknownHostException cause) { - resolveCache.cache(hostname, additionals, cause, parent.ch.eventLoop()); + resolveCache.cache(hostname, additionals, cause, parent.ch.executor()); } @Override diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java index 4fea0cecfa..b25ac14a9c 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java @@ -1157,7 +1157,7 @@ public class DnsNameResolver extends InetNameResolver { InetSocketAddress nameServerAddr, DnsQuestion question) { return query0(nameServerAddr, question, EMPTY_ADDITIONALS, true, ch.newPromise(), - ch.eventLoop().newPromise()); + ch.executor().newPromise()); } /** @@ -1167,7 +1167,7 @@ public class DnsNameResolver extends InetNameResolver { InetSocketAddress nameServerAddr, DnsQuestion question, Iterable additionals) { return query0(nameServerAddr, question, toArray(additionals, false), true, ch.newPromise(), - ch.eventLoop().newPromise()); + ch.executor().newPromise()); } /** @@ -1288,7 +1288,7 @@ public class DnsNameResolver extends InetNameResolver { final Channel channel = future.getNow(); Promise> promise = - channel.eventLoop().newPromise(); + channel.executor().newPromise(); final TcpDnsQueryContext tcpCtx = new TcpDnsQueryContext(DnsNameResolver.this, channel, (InetSocketAddress) channel.remoteAddress(), qCtx.question(), EMPTY_ADDITIONALS, promise); diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java index b744af333d..76ce8fd42f 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java @@ -156,7 +156,7 @@ abstract class DnsQueryContext implements FutureListener 0) { - timeoutFuture = parent.ch.eventLoop().schedule(() -> { + timeoutFuture = parent.ch.executor().schedule(() -> { if (promise.isDone()) { // Received a response before the query times out. return; diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java index 1a4e49cd2e..21e194b146 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java @@ -414,7 +414,7 @@ abstract class DnsResolveContext { } final Promise writePromise = parent.ch.newPromise(); final Promise> queryPromise = - parent.ch.eventLoop().newPromise(); + parent.ch.executor().newPromise(); final Future> f = parent.query0(nameServerAddr, question, additionals, flush, writePromise, queryPromise); diff --git a/resolver-dns/src/test/java/io/netty/resolver/dns/DnsResolveContextTest.java b/resolver-dns/src/test/java/io/netty/resolver/dns/DnsResolveContextTest.java index f6b19f9ed7..cb85ea82c5 100644 --- a/resolver-dns/src/test/java/io/netty/resolver/dns/DnsResolveContextTest.java +++ b/resolver-dns/src/test/java/io/netty/resolver/dns/DnsResolveContextTest.java @@ -42,15 +42,15 @@ public class DnsResolveContextTest { EmbeddedChannel channel = new EmbeddedChannel(); DnsCnameCache cache = new DefaultDnsCnameCache(); if (chainLength == 1) { - cache.cache(HOSTNAME, HOSTNAME, Long.MAX_VALUE, channel.eventLoop()); + cache.cache(HOSTNAME, HOSTNAME, Long.MAX_VALUE, channel.executor()); } else { String lastName = HOSTNAME; for (int i = 1; i < chainLength; i++) { String nextName = i + "." + lastName; - cache.cache(lastName, nextName, Long.MAX_VALUE, channel.eventLoop()); + cache.cache(lastName, nextName, Long.MAX_VALUE, channel.executor()); lastName = nextName; } - cache.cache(lastName, HOSTNAME, Long.MAX_VALUE, channel.eventLoop()); + cache.cache(lastName, HOSTNAME, Long.MAX_VALUE, channel.executor()); } return cache; } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCloseForciblyTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCloseForciblyTest.java index 3d81103925..570f62c7a5 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCloseForciblyTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCloseForciblyTest.java @@ -37,7 +37,7 @@ public class SocketCloseForciblyTest extends AbstractSocketTest { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { final SocketChannel childChannel = (SocketChannel) msg; // Dispatch on the EventLoop as all operation on Unsafe should be done while on the EventLoop. - childChannel.eventLoop().execute(() -> { + childChannel.executor().execute(() -> { childChannel.config().setSoLinger(0); childChannel.unsafe().closeForcibly(); }); diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java index f116bc51fb..6498638fd5 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java @@ -326,7 +326,7 @@ public class SocketHalfClosedTest extends AbstractSocketTest { // but the close will be done after the Selector did process all events. Because of // this we will need to give it a bit time to ensure the FD is actual closed before we // count down the latch and try to write. - channel.eventLoop().schedule(followerCloseLatch::countDown, 200, MILLISECONDS); + channel.executor().schedule(followerCloseLatch::countDown, 200, MILLISECONDS); })); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 98d7c7afd2..75c24d70a0 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -175,7 +175,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // if SO_LINGER is used. // // See https://github.com/netty/netty/issues/7159 - EventLoop loop = eventLoop(); + EventLoop loop = executor(); if (loop.inEventLoop()) { doDeregister(); } else { @@ -255,7 +255,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann final void clearEpollIn() { // Only clear if registered with an EventLoop as otherwise if (isRegistered()) { - final EventLoop loop = eventLoop(); + final EventLoop loop = executor(); final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); if (loop.inEventLoop()) { unsafe.clearEpollIn0(); @@ -456,7 +456,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann return; } epollInReadyRunnablePending = true; - eventLoop().execute(epollInReadyRunnable); + executor().execute(epollInReadyRunnable); } /** @@ -565,7 +565,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } protected final void clearEpollIn0() { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); try { readPending = false; clearFlag(Native.EPOLLIN); @@ -599,7 +599,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(() -> { + connectTimeoutFuture = executor().schedule(() -> { Promise connectPromise = AbstractEpollChannel.this.connectPromise; if (connectPromise != null && !connectPromise.isDone() && connectPromise.tryFailure(new ConnectTimeoutException( @@ -667,7 +667,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); boolean connectStillInProgress = false; try { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index e35603ad09..04e71d6fa1 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -95,7 +95,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im @Override void epollInReady() { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); final ChannelConfig config = config(); if (shouldBreakEpollInReady(config)) { clearEpollIn0(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 67dcaf4d1b..ee6a86679e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -312,7 +312,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im clearFlag(Native.EPOLLOUT); // We used our writeSpin quantum, and should try to write again later. - eventLoop().execute(flushTask); + executor().execute(flushTask); } else { // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up // when it can accept more data. @@ -430,7 +430,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im @Override public Future shutdownOutput(final Promise promise) { - EventLoop loop = eventLoop(); + EventLoop loop = executor(); if (loop.inEventLoop()) { ((AbstractUnsafe) unsafe()).shutdownOutput(promise); } else { @@ -451,7 +451,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im if (closeExecutor != null) { closeExecutor.execute(() -> shutdownInput0(promise)); } else { - EventLoop loop = eventLoop(); + EventLoop loop = executor(); if (loop.inEventLoop()) { shutdownInput0(promise); } else { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 8b0d5c1391..b289b26e9b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -459,7 +459,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override void epollInReady() { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); EpollDatagramChannelConfig config = config(); if (shouldBreakEpollInReady(config)) { clearEpollIn0(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainDatagramChannel.java index 7f290a50dc..affb106a1b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainDatagramChannel.java @@ -292,7 +292,7 @@ public final class EpollDomainDatagramChannel extends AbstractEpollChannel imple @Override void epollInReady() { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); final DomainDatagramChannelConfig config = config(); if (shouldBreakEpollInReady(config)) { clearEpollIn0(); diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFdTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFdTest.java index 1e104e0232..c61772d87d 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFdTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFdTest.java @@ -62,7 +62,7 @@ public class EpollDomainSocketFdTest extends AbstractSocketTest { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // Create new channel and obtain a file descriptor from it. - final EpollDomainSocketChannel ch = new EpollDomainSocketChannel(ctx.channel().eventLoop()); + final EpollDomainSocketChannel ch = new EpollDomainSocketChannel(ctx.channel().executor()); ctx.writeAndFlush(ch.fd()).addListener(future -> { if (!future.isSuccess()) { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 2e0d7b3606..5e03eb90de 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -308,7 +308,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan final void clearReadFilter() { // Only clear if registered with an EventLoop as otherwise if (isRegistered()) { - final EventLoop loop = eventLoop(); + final EventLoop loop = executor(); final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe(); if (loop.inEventLoop()) { unsafe.clearReadFilter0(); @@ -507,11 +507,11 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan return; } readReadyRunnablePending = true; - eventLoop().execute(readReadyRunnable); + executor().execute(readReadyRunnable); } protected final void clearReadFilter0() { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); try { readPending = false; readFilter(false); @@ -550,7 +550,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(() -> { + connectTimeoutFuture = executor().schedule(() -> { Promise connectPromise = AbstractKQueueChannel.this.connectPromise; if (connectPromise != null && !connectPromise.isDone() && connectPromise.tryFailure(new ConnectTimeoutException( @@ -618,7 +618,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); boolean connectStillInProgress = false; try { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java index 29c001f23b..cb5fee95ad 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java @@ -89,7 +89,7 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel @Override void readReady(KQueueRecvByteAllocatorHandle allocHandle) { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadFilter0(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java index 338b89aee7..f7e4efbb0a 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java @@ -294,7 +294,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel writeFilter(false); // We used our writeSpin quantum, and should try to write again later. - eventLoop().execute(flushTask); + executor().execute(flushTask); } else { // Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up // when it can accept more data. @@ -403,7 +403,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel @Override public Future shutdownOutput(final Promise promise) { - EventLoop loop = eventLoop(); + EventLoop loop = executor(); if (loop.inEventLoop()) { ((AbstractUnsafe) unsafe()).shutdownOutput(promise); } else { @@ -419,7 +419,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel @Override public Future shutdownInput(final Promise promise) { - EventLoop loop = eventLoop(); + EventLoop loop = executor(); if (loop.inEventLoop()) { shutdownInput0(promise); } else { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java index f11899507f..0dce1c2193 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java @@ -352,7 +352,7 @@ public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel i @Override void readReady(KQueueRecvByteAllocatorHandle allocHandle) { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); final DatagramChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadFilter0(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainDatagramChannel.java index 8fa50f8534..90c0d1c766 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainDatagramChannel.java @@ -241,7 +241,7 @@ public final class KQueueDomainDatagramChannel extends AbstractKQueueDatagramCha @Override void readReady(KQueueRecvByteAllocatorHandle allocHandle) { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); final DomainDatagramChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadFilter0(); diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketFdTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketFdTest.java index 4aba8a5b60..0fce83ed8c 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketFdTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketFdTest.java @@ -62,7 +62,7 @@ public class KQueueDomainSocketFdTest extends AbstractSocketTest { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // Create new channel and obtain a file descriptor from it. - final KQueueDomainSocketChannel ch = new KQueueDomainSocketChannel(ctx.channel().eventLoop()); + final KQueueDomainSocketChannel ch = new KQueueDomainSocketChannel(ctx.channel().executor()); ctx.writeAndFlush(ch.fd()).addListener(future -> { if (!future.isSuccess()) { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index 4963092553..b1c6baf4cf 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -342,7 +342,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett @Override public Future bindAddress(final InetAddress localAddress, final Promise promise) { - if (eventLoop().inEventLoop()) { + if (executor().inEventLoop()) { try { javaChannel().bindAddress(localAddress); promise.setSuccess(null); @@ -350,7 +350,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett promise.setFailure(t); } } else { - eventLoop().execute(() -> bindAddress(localAddress, promise)); + executor().execute(() -> bindAddress(localAddress, promise)); } return promise; } @@ -362,7 +362,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett @Override public Future unbindAddress(final InetAddress localAddress, final Promise promise) { - if (eventLoop().inEventLoop()) { + if (executor().inEventLoop()) { try { javaChannel().unbindAddress(localAddress); promise.setSuccess(null); @@ -370,7 +370,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett promise.setFailure(t); } } else { - eventLoop().execute(() -> unbindAddress(localAddress, promise)); + executor().execute(() -> unbindAddress(localAddress, promise)); } return promise; } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index 0d3d5e57e6..05bb874225 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -162,7 +162,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel @Override public Future bindAddress(final InetAddress localAddress, final Promise promise) { - if (eventLoop().inEventLoop()) { + if (executor().inEventLoop()) { try { javaChannel().bindAddress(localAddress); promise.setSuccess(null); @@ -170,7 +170,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel promise.setFailure(t); } } else { - eventLoop().execute(() -> bindAddress(localAddress, promise)); + executor().execute(() -> bindAddress(localAddress, promise)); } return promise; } @@ -182,7 +182,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel @Override public Future unbindAddress(final InetAddress localAddress, final Promise promise) { - if (eventLoop().inEventLoop()) { + if (executor().inEventLoop()) { try { javaChannel().unbindAddress(localAddress); promise.setSuccess(null); @@ -190,7 +190,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel promise.setFailure(t); } } else { - eventLoop().execute(() -> unbindAddress(localAddress, promise)); + executor().execute(() -> unbindAddress(localAddress, promise)); } return promise; } diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index 273daa79fb..d35cecb92e 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -317,7 +317,7 @@ public abstract class AbstractBootstrap, C final SocketAddress localAddress, final Promise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { if (regFuture.isSuccess()) { PromiseNotifier.cascade(channel.bind(localAddress), promise) .addListener(channel, ChannelFutureListeners.CLOSE_ON_FAILURE); diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index a934d8464e..99488ac177 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -217,7 +217,7 @@ public class Bootstrap extends AbstractBootstrap promise) { try { - final EventLoop eventLoop = channel.eventLoop(); + final EventLoop eventLoop = channel.executor(); final AddressResolver resolver = this.resolver.getResolver(eventLoop); if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) { @@ -260,7 +260,7 @@ public class Bootstrap extends AbstractBootstrap promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { final Future future; if (localAddress == null) { future = channel.connect(remoteAddress); @@ -281,7 +281,7 @@ public class Bootstrap extends AbstractBootstrap init(Channel channel) { - Promise promise = new DefaultPromise<>(channel.eventLoop()); + Promise promise = new DefaultPromise<>(channel.executor()); setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, newAttributesArray()); @@ -184,7 +184,7 @@ public class ServerBootstrap extends AbstractBootstrap { + ch.executor().execute(() -> { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildHandler, currentChildOptions, currentChildAttrs)); promise.setSuccess(ch); @@ -241,7 +241,7 @@ public class ServerBootstrap extends AbstractBootstrap + executor().execute(() -> closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause)); } }); @@ -850,7 +850,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha // -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet // // which means the execution of two inbound handler methods of the same handler overlap undesirably. - eventLoop().execute(task); + executor().execute(task); } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } @@ -900,7 +900,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha * Sub-classes may override this method */ protected void doRegister() throws Exception { - eventLoop().unsafe().register(this); + executor().unsafe().register(this); } /** @@ -933,7 +933,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha * Sub-classes may override this method */ protected void doDeregister() throws Exception { - eventLoop().unsafe().deregister(this); + executor().unsafe().deregister(this); } /** diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 9d9b9a142a..47a6746386 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -86,7 +86,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl /** * Return the {@link EventLoop} this {@link Channel} was registered to. */ - EventLoop eventLoop(); + EventLoop executor(); /** * Returns the parent of this channel. @@ -261,21 +261,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl return this; } - @Override - default Promise newPromise() { - return eventLoop().newPromise(); - } - - @Override - default Future newSucceededFuture() { - return eventLoop().newSucceededFuture(null); - } - - @Override - default Future newFailedFuture(Throwable cause) { - return eventLoop().newFailedFuture(cause); - } - /** * Unsafe operations that should never be called from user-code. These methods * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index a628c6db08..2894ac9978 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -20,7 +20,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.AttributeMap; -import io.netty.util.concurrent.EventExecutor; /** * Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline} @@ -104,11 +103,11 @@ import io.netty.util.concurrent.EventExecutor; * // calculated correctly 4 times once the two pipelines (p1 and p2) are active. * FactorialHandler fh = new FactorialHandler(); * - * {@link ChannelPipeline} p1 = {@link Channels}.pipeline(); + * {@link ChannelPipeline} p1 = {@link Channel}.pipeline(); * p1.addLast("f1", fh); * p1.addLast("f2", fh); * - * {@link ChannelPipeline} p2 = {@link Channels}.pipeline(); + * {@link ChannelPipeline} p2 = {@link Channel}.pipeline(); * p2.addLast("f3", fh); * p2.addLast("f4", fh); * @@ -127,11 +126,6 @@ public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvok */ Channel channel(); - /** - * Returns the {@link EventExecutor} which is used to execute an arbitrary task. - */ - EventExecutor executor(); - /** * The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler} * was added to the {@link ChannelPipeline}. This name can also be used to access the registered diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index d1ecde09e3..967548d341 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -610,7 +610,7 @@ public final class ChannelOutboundBuffer { if (task == null) { fireChannelWritabilityChangedTask = task = pipeline::fireChannelWritabilityChanged; } - channel.eventLoop().execute(task); + channel.executor().execute(task); } else { pipeline.fireChannelWritabilityChanged(); } @@ -655,7 +655,7 @@ public final class ChannelOutboundBuffer { void close(final Throwable cause, final boolean allowChannelOpen) { if (inFail) { - channel.eventLoop().execute(() -> close(cause, allowChannelOpen)); + channel.executor().execute(() -> close(cause, allowChannelOpen)); return; } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java index 9329d223d8..c841d99410 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java @@ -147,19 +147,32 @@ public interface ChannelOutboundInvoker { /** * Return a new {@link Promise}. */ - Promise newPromise(); + default Promise newPromise() { + return executor().newPromise(); + } /** * Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()} * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also * every call of blocking methods will just return without blocking. */ - Future newSucceededFuture(); + default Future newSucceededFuture() { + return executor().newSucceededFuture(null); + } /** * Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()} * will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also * every call of blocking methods will just return without blocking. */ - Future newFailedFuture(Throwable cause); + default Future newFailedFuture(Throwable cause) { + return executor().newFailedFuture(cause); + } + + /** + * Returns the {@link EventExecutor} that is used to execute the operations of this {@link ChannelOutboundInvoker}. + * + * @return the executor. + */ + EventExecutor executor(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 5f7bc58fb5..60ede112f5 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -16,7 +16,6 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; -import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Promise; import java.net.SocketAddress; @@ -538,9 +537,4 @@ public interface ChannelPipeline @Override ChannelPipeline flush(); - - /** - * Returns the {@link EventExecutor} which is used by all {@link ChannelHandler}s in the pipeline. - */ - EventExecutor executor(); } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 57653d719c..c156f7239b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -78,7 +78,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { public DefaultChannelPipeline(Channel channel) { this.channel = requireNonNull(channel, "channel"); - succeededFuture = DefaultPromise.newSuccessfulPromise(channel.eventLoop(), null); + succeededFuture = DefaultPromise.newSuccessfulPromise(channel.executor(), null); tail = new DefaultChannelHandlerContext(this, TAIL_NAME, TAIL_HANDLER); head = new DefaultChannelHandlerContext(this, HEAD_NAME, HEAD_HANDLER); @@ -132,7 +132,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public final EventExecutor executor() { - return channel().eventLoop(); + return channel().executor(); } @Override diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 22112e36ed..26bd3efcad 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -520,7 +520,7 @@ public class EmbeddedChannel extends AbstractChannel { runPendingTasks(); if (cancel) { // Cancel all scheduled tasks that are left. - ((EmbeddedEventLoop) eventLoop()).cancelScheduled(); + ((EmbeddedEventLoop) executor()).cancelScheduled(); } } @@ -556,7 +556,7 @@ public class EmbeddedChannel extends AbstractChannel { * for this {@link Channel} */ public void runPendingTasks() { - EmbeddedEventLoop embeddedEventLoop = (EmbeddedEventLoop) eventLoop(); + EmbeddedEventLoop embeddedEventLoop = (EmbeddedEventLoop) executor(); try { embeddedEventLoop.runTasks(); } catch (Exception e) { @@ -572,7 +572,7 @@ public class EmbeddedChannel extends AbstractChannel { * {@code -1}. */ public long runScheduledPendingTasks() { - EmbeddedEventLoop embeddedEventLoop = (EmbeddedEventLoop) eventLoop(); + EmbeddedEventLoop embeddedEventLoop = (EmbeddedEventLoop) executor(); try { return embeddedEventLoop.runScheduledTasks(); @@ -746,7 +746,7 @@ public class EmbeddedChannel extends AbstractChannel { } private void mayRunPendingTasks() { - if (!((EmbeddedEventLoop) eventLoop()).running) { + if (!((EmbeddedEventLoop) executor()).running) { runPendingTasks(); } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index ecc3205c3c..3c1ec6bd8f 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -186,7 +186,7 @@ public class LocalChannel extends AbstractChannel { // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true. // This ensures that if both channels are on the same event loop, the peer's channelInActive // event is triggered *after* this peer's channelInActive event - EventLoop peerEventLoop = peer.eventLoop(); + EventLoop peerEventLoop = peer.executor(); final boolean peerIsActive = peer.isActive(); try { peerEventLoop.execute(() -> peer.tryClose(peerIsActive)); @@ -269,7 +269,7 @@ public class LocalChannel extends AbstractChannel { } } else { try { - eventLoop().execute(readTask); + executor().execute(readTask); } catch (Throwable cause) { logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause); close(); @@ -331,7 +331,7 @@ public class LocalChannel extends AbstractChannel { private void finishPeerRead(final LocalChannel peer) { // If the peer is also writing, then we must schedule the event on the event loop to preserve read order. - if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) { + if (peer.executor() == executor() && !peer.writeInProgress) { finishPeerRead0(peer); } else { runFinishPeerReadTask(peer); @@ -344,9 +344,9 @@ public class LocalChannel extends AbstractChannel { final Runnable finishPeerReadTask = () -> finishPeerRead0(peer); try { if (peer.writeInProgress) { - peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask); + peer.finishReadFuture = peer.executor().submit(finishPeerReadTask); } else { - peer.eventLoop().execute(finishPeerReadTask); + peer.executor().execute(finishPeerReadTask); } } catch (Throwable cause) { logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause); @@ -357,7 +357,7 @@ public class LocalChannel extends AbstractChannel { } private void releaseInboundBuffers() { - assert eventLoop() == null || eventLoop().inEventLoop(); + assert executor() == null || executor().inEventLoop(); readInProgress = false; Queue inboundBuffer = this.inboundBuffer; Object msg; @@ -456,7 +456,7 @@ public class LocalChannel extends AbstractChannel { // This ensures that if both channels are on the same event loop, the peer's channelActive // event is triggered *after* this channel's channelRegistered event, so that this channel's // pipeline is fully initialized by ChannelInitializer before any channelRead events. - peer.eventLoop().execute(() -> { + peer.executor().execute(() -> { Promise promise = peer.connectPromise; // Only trigger fireChannelActive() if the promise was not null and was not completed yet. diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index 3eed1630d1..8c8c97bc75 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -112,10 +112,10 @@ public class LocalServerChannel extends AbstractServerChannel { LocalChannel serve(final LocalChannel peer) { final LocalChannel child = newLocalChannel(peer); - if (eventLoop().inEventLoop()) { + if (executor().inEventLoop()) { serve0(child); } else { - eventLoop().execute(() -> serve0(child)); + executor().execute(() -> serve0(child)); } return child; } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 3d794973ca..cb0346e7ea 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -302,7 +302,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { clearOpWrite(); // Schedule flush again later so other tasks can be picked up in the meantime - eventLoop().execute(flushTask); + executor().execute(flushTask); } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index e3c705852a..69223eba3f 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -125,7 +125,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Deprecated protected void setReadPending(final boolean readPending) { if (isRegistered()) { - EventLoop eventLoop = eventLoop(); + EventLoop eventLoop = executor(); if (eventLoop.inEventLoop()) { setReadPending0(readPending); } else { @@ -144,7 +144,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { */ protected final void clearReadPending() { if (isRegistered()) { - EventLoop eventLoop = eventLoop(); + EventLoop eventLoop = executor(); if (eventLoop.inEventLoop()) { clearReadPending0(); } else { @@ -237,7 +237,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(() -> { + connectTimeoutFuture = executor().schedule(() -> { Promise connectPromise = AbstractNioChannel.this.connectPromise; if (connectPromise != null && !connectPromise.isDone() && connectPromise.tryFailure(new ConnectTimeoutException( @@ -305,7 +305,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); try { boolean wasActive = isActive(); @@ -348,12 +348,12 @@ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected void doRegister() throws Exception { - eventLoop().unsafe().register(this); + executor().unsafe().register(this); } @Override protected void doDeregister() throws Exception { - eventLoop().unsafe().deregister(this); + executor().unsafe().deregister(this); } @Override diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index e5a273ce66..ec4d839180 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -66,7 +66,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { @Override public void read() { - assert eventLoop().inEventLoop(); + assert executor().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 45d1a0cd25..06aa4047c3 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -164,7 +164,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override public Future shutdownOutput(final Promise promise) { - final EventLoop loop = eventLoop(); + final EventLoop loop = executor(); if (loop.inEventLoop()) { ((AbstractUnsafe) unsafe()).shutdownOutput(promise); } else { @@ -185,7 +185,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override public Future shutdownInput(final Promise promise) { - EventLoop loop = eventLoop(); + EventLoop loop = executor(); if (loop.inEventLoop()) { shutdownInput0(promise); } else { diff --git a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java index 3e85978231..b531e6b75d 100644 --- a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java +++ b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java @@ -199,7 +199,7 @@ public class BootstrapTest { registerHandler.registerPromise().setSuccess(null); final BlockingQueue queue = new LinkedBlockingQueue<>(); future.addListener(fut -> { - queue.add(fut.getNow().eventLoop().inEventLoop(Thread.currentThread())); + queue.add(fut.getNow().executor().inEventLoop(Thread.currentThread())); queue.add(fut.isSuccess()); }); assertTrue(queue.take()); diff --git a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java index a8d48f9850..326f13b245 100644 --- a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java @@ -134,7 +134,7 @@ public class ChannelInitializerTest { try { // Execute some task on the EventLoop and wait until its done to be sure all handlers are added to the // pipeline. - channel.eventLoop().submit(() -> { + channel.executor().submit(() -> { // NOOP }).syncUninterruptibly(); Iterator> handlers = channel.pipeline().iterator(); @@ -171,7 +171,7 @@ public class ChannelInitializerTest { try { // Execute some task on the EventLoop and wait until its done to be sure all handlers are added to the // pipeline. - channel.eventLoop().submit(() -> { + channel.executor().submit(() -> { // NOOP }).syncUninterruptibly(); assertEquals(1, initChannelCalled.get()); diff --git a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java index 5d1a600853..b95333d0d9 100644 --- a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java @@ -392,7 +392,7 @@ public class ChannelOutboundBufferTest { ChannelOutboundBuffer cob = ch.unsafe().outboundBuffer(); - ch.eventLoop().execute(() -> { + ch.executor().execute(() -> { // Trigger channelWritabilityChanged() by writing a lot. ch.write(buffer().writeZero(257)); assertThat(buf.toString(), is("false ")); diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 4e44dfb5f4..b96d5fd9ed 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -539,7 +539,7 @@ public class DefaultChannelPipelineTest { // Add handler. p.addFirst(handler.name, handler); - self.eventLoop().execute(() -> { + self.executor().execute(() -> { // Validate handler life-cycle methods called. handler.validate(true, false); @@ -559,7 +559,7 @@ public class DefaultChannelPipelineTest { for (final LifeCycleAwareTestHandler handler : handlers) { assertSame(handler, p.remove(handler.name)); - self.eventLoop().execute(() -> { + self.executor().execute(() -> { // Validate handler life-cycle methods called. handler.validate(true, true); removeLatch.countDown(); @@ -576,7 +576,7 @@ public class DefaultChannelPipelineTest { setUp(handler1, handler2); - self.eventLoop().submit(() -> { + self.executor().submit(() -> { ChannelPipeline p = self.pipeline(); handler1.inboundBuffer.add(8); assertEquals(8, handler1.inboundBuffer.peek()); @@ -595,7 +595,7 @@ public class DefaultChannelPipelineTest { setUp(handler1, handler2); - self.eventLoop().submit(() -> { + self.executor().submit(() -> { ChannelPipeline p = self.pipeline(); handler2.outboundBuffer.add(8); assertEquals(8, handler2.outboundBuffer.peek()); @@ -614,7 +614,7 @@ public class DefaultChannelPipelineTest { setUp(handler1); - self.eventLoop().submit(() -> { + self.executor().submit(() -> { ChannelPipeline p = self.pipeline(); handler1.outboundBuffer.add(8); assertEquals(8, handler1.outboundBuffer.peek()); @@ -632,7 +632,7 @@ public class DefaultChannelPipelineTest { setUp(handler1); - self.eventLoop().submit(() -> { + self.executor().submit(() -> { ChannelPipeline p = self.pipeline(); handler1.inboundBuffer.add(8); handler1.outboundBuffer.add(8); @@ -657,7 +657,7 @@ public class DefaultChannelPipelineTest { setUp(handler1, handler2, handler3); - self.eventLoop().submit(() -> { + self.executor().submit(() -> { ChannelPipeline p = self.pipeline(); handler2.inboundBuffer.add(8); handler2.outboundBuffer.add(8); @@ -1099,7 +1099,7 @@ public class DefaultChannelPipelineTest { pipeline.channel().closeFuture().syncUninterruptibly(); // Schedule something on the EventLoop to ensure all other scheduled tasks had a chance to complete. - pipeline.channel().eventLoop().submit(() -> { + pipeline.channel().executor().submit(() -> { // NOOP }).syncUninterruptibly(); Error error = errorRef.get(); @@ -1543,7 +1543,7 @@ public class DefaultChannelPipelineTest { }; if (executeInEventLoop) { - pipeline.channel().eventLoop().execute(r); + pipeline.channel().executor().execute(r); } else { r.run(); } diff --git a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java index 051ac9f098..43b016ef96 100644 --- a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java +++ b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java @@ -132,7 +132,7 @@ public class PendingWriteQueueTest { final PendingWriteQueue queue = queueRef.get(); - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { // Trigger channelWritabilityChanged() by adding a message that's larger than the high watermark. queue.add(msg, channel.newPromise()); }); @@ -212,7 +212,7 @@ public class PendingWriteQueueTest { promise.addListener(future -> queue.removeAndFailAll(new IllegalStateException())); Promise promise2 = channel.newPromise(); - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { queue.add(1L, promise); queue.add(2L, promise2); queue.removeAndFailAll(new Exception()); @@ -244,7 +244,7 @@ public class PendingWriteQueueTest { }); Promise promise2 = channel.newPromise(); - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { queue.add(1L, promise); queue.add(2L, promise2); queue.removeAndWriteAll(); @@ -257,7 +257,7 @@ public class PendingWriteQueueTest { assertFalse(promise3.isDone()); assertFalse(promise3.isSuccess()); - channel.eventLoop().execute(queue::removeAndWriteAll); + channel.executor().execute(queue::removeAndWriteAll); assertTrue(promise3.isDone()); assertTrue(promise3.isSuccess()); channel.runPendingTasks(); @@ -284,7 +284,7 @@ public class PendingWriteQueueTest { }); Promise promise2 = channel.newPromise(); promise2.addListener(future -> failOrder.add(2)); - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { queue.add(1L, promise); queue.add(2L, promise2); queue.removeAndFailAll(new Exception()); @@ -311,7 +311,7 @@ public class PendingWriteQueueTest { promise.addListener(future -> queue.removeAndWriteAll()); Promise promise2 = channel.newPromise(); - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { queue.add(1L, promise); queue.add(2L, promise2); @@ -340,7 +340,7 @@ public class PendingWriteQueueTest { IllegalStateException ex = new IllegalStateException(); Promise promise = channel.newPromise(); - channel.eventLoop().execute(() -> { + channel.executor().execute(() -> { queue.add(1L, promise); queue.removeAndFailAll(ex); }); diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index 1b39f58727..f7ff8ecf96 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -125,7 +125,7 @@ public class EmbeddedChannelTest { public void testScheduling() throws Exception { EmbeddedChannel ch = new EmbeddedChannel(new ChannelHandler() { }); final CountDownLatch latch = new CountDownLatch(2); - ScheduledFuture future = ch.eventLoop().schedule(latch::countDown, 1, TimeUnit.SECONDS); + ScheduledFuture future = ch.executor().schedule(latch::countDown, 1, TimeUnit.SECONDS); future.addListener(future1 -> latch.countDown()); long next = ch.runScheduledPendingTasks(); assertTrue(next > 0); @@ -139,7 +139,7 @@ public class EmbeddedChannelTest { @Test public void testScheduledCancelled() throws Exception { EmbeddedChannel ch = new EmbeddedChannel(new ChannelHandler() { }); - ScheduledFuture future = ch.eventLoop().schedule(() -> { }, 1, TimeUnit.DAYS); + ScheduledFuture future = ch.executor().schedule(() -> { }, 1, TimeUnit.DAYS); ch.finish(); assertTrue(future.isCancelled()); } diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index 1b1e33fa9d..7cd157bf80 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -119,7 +119,7 @@ public class LocalChannelTest { // Connect to the server cc = cb.connect(sc.localAddress()).get(); final Channel ccCpy = cc; - cc.eventLoop().execute(() -> { + cc.executor().execute(() -> { // Send a message event up the pipeline. ccCpy.pipeline().fireChannelRead("Hello, World"); latch.countDown(); @@ -715,7 +715,7 @@ public class LocalChannelTest { cc.pipeline().lastContext().executor().execute(() -> ccCpy.writeAndFlush(data.retainedDuplicate()) .addListener(future -> { - serverChannelCpy.eventLoop().execute(() -> { + serverChannelCpy.executor().execute(() -> { // The point of this test is to write while the peer is closed, so we should // ensure the peer is actually closed before we write. int waitCount = 0; @@ -784,7 +784,7 @@ public class LocalChannelTest { cc = cb.register().get(); final AtomicReference> ref = new AtomicReference<>(); - final Promise assertPromise = cc.eventLoop().newPromise(); + final Promise assertPromise = cc.executor().newPromise(); cc.pipeline().addLast(new TestHandler() { @Override diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java index 4f252a88de..5c8f37fc56 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java @@ -76,7 +76,7 @@ public class LocalTransportThreadModelTest2 { public void close(final Channel localChannel, final LocalHandler localRegistrationHandler) { // we want to make sure we actually shutdown IN the event loop - if (localChannel.eventLoop().inEventLoop()) { + if (localChannel.executor().inEventLoop()) { // Wait until all messages are flushed before closing the channel. if (localRegistrationHandler.lastWriteFuture != null) { localRegistrationHandler.lastWriteFuture.awaitUninterruptibly(); @@ -86,7 +86,7 @@ public class LocalTransportThreadModelTest2 { return; } - localChannel.eventLoop().execute(() -> close(localChannel, localRegistrationHandler)); + localChannel.executor().execute(() -> close(localChannel, localRegistrationHandler)); // Wait until the connection is closed or the connection attempt fails. localChannel.closeFuture().awaitUninterruptibly(); diff --git a/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java b/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java index 5609092367..29345c23ef 100644 --- a/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java +++ b/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java @@ -190,7 +190,7 @@ public abstract class AbstractNioChannelTest { T channel = newNioChannel(wrapped); channel.register().syncUninterruptibly(); - assertSame(wrapped, channel.eventLoop()); + assertSame(wrapped, channel.executor()); channel.close().syncUninterruptibly(); eventLoopGroup.shutdownGracefully(); }