From 1fe931b6e29fbb03d362df3b15101887069a583b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 17 Jan 2019 09:17:51 +0100 Subject: [PATCH] Make it possible to use a wrapped EventLoop with a Channel (#8677) Motiviation: Because of how we implemented the registration / deregistration of an EventLoop it was not possible to wrap an EventLoop implementation and use it with a Channel. Modification: - Introduce EventLoop.Unsafe which is responsible for the actual registration. - Move validation of EventLoop / Channel combo to the EventLoop - Add unit test that verifies that wrapping works Result: Be able to wrap an EventLoop and so add some extra functionality. --- .../http2/DefaultHttp2ConnectionTest.java | 6 +- .../http2/Http2ConnectionRoundtripTest.java | 6 +- .../http2/Http2MultiplexCodecBuilderTest.java | 4 +- .../HttpToHttp2ConnectionHandlerTest.java | 6 +- .../http2/InboundHttp2ToHttpAdapterTest.java | 6 +- .../io/netty/example/localecho/LocalEcho.java | 4 +- .../handler/ssl/CipherSuiteCanaryTest.java | 4 +- .../handler/ssl/SniClientJava8TestUtil.java | 4 +- .../io/netty/handler/ssl/SniClientTest.java | 5 +- .../io/netty/handler/ssl/SniHandlerTest.java | 4 +- .../io/netty/handler/ssl/SslHandlerTest.java | 6 +- .../io/netty/handler/ssl/ocsp/OcspTest.java | 4 +- .../ScheduledFutureTaskBenchmark.java | 6 +- ...efaultAuthoritativeDnsServerCacheTest.java | 11 +- .../resolver/dns/DefaultDnsCacheTest.java | 12 +- .../dns/DefaultDnsCnameCacheTest.java | 12 +- .../channel/epoll/AbstractEpollChannel.java | 39 +++--- .../epoll/AbstractEpollServerChannel.java | 5 - .../epoll/AbstractEpollStreamChannel.java | 2 +- .../channel/epoll/EpollDatagramChannel.java | 4 +- .../netty/channel/epoll/EpollEventLoop.java | 50 ++++++++ .../channel/epoll/EpollRegistration.java | 46 +++++++ .../epoll/EpollServerSocketChannel.java | 5 - .../channel/epoll/EpollSocketChannel.java | 2 +- .../channel/kqueue/AbstractKQueueChannel.java | 54 ++++----- .../kqueue/AbstractKQueueServerChannel.java | 5 - .../kqueue/AbstractKQueueStreamChannel.java | 2 +- .../channel/kqueue/KQueueDatagramChannel.java | 2 +- .../netty/channel/kqueue/KQueueEventLoop.java | 56 +++++++-- .../channel/kqueue/KQueueRegistration.java | 35 ++++++ .../kqueue/KQueueServerSocketChannel.java | 5 - .../channel/kqueue/KQueueSocketChannel.java | 2 +- .../io/netty/bootstrap/FailedChannel.java | 5 - .../io/netty/channel/AbstractChannel.java | 14 +-- .../io/netty/channel/DefaultEventLoop.java | 63 ---------- .../netty/channel/DefaultEventLoopGroup.java | 66 ----------- .../main/java/io/netty/channel/EventLoop.java | 21 ++++ .../netty/channel/SingleThreadEventLoop.java | 1 - .../channel/embedded/EmbeddedChannel.java | 8 +- .../channel/embedded/EmbeddedEventLoop.java | 27 +++++ .../io/netty/channel/local/LocalChannel.java | 95 +++++++-------- .../channel/local/LocalChannelUnsafe.java | 23 ++++ .../netty/channel/local/LocalEventLoop.java | 72 +++++++++++ .../channel/local/LocalEventLoopGroup.java | 31 ++++- .../channel/local/LocalServerChannel.java | 40 ++++--- .../netty/channel/nio/AbstractNioChannel.java | 32 +---- .../io/netty/channel/nio/NioEventLoop.java | 47 +++++++- .../io/netty/bootstrap/BootstrapTest.java | 12 +- .../netty/bootstrap/ServerBootstrapTest.java | 5 +- .../io/netty/channel/AbstractChannelTest.java | 7 +- .../io/netty/channel/BaseChannelTest.java | 5 +- .../netty/channel/ChannelInitializerTest.java | 6 +- .../channel/ChannelOutboundBufferTest.java | 8 +- .../DefaultChannelPipelineTailTest.java | 112 +++--------------- .../channel/DefaultChannelPipelineTest.java | 11 +- .../channel/SingleThreadEventLoopTest.java | 20 ++++ .../netty/channel/local/LocalChannelTest.java | 11 +- .../local/LocalTransportThreadModelTest.java | 7 +- .../local/LocalTransportThreadModelTest2.java | 5 +- .../local/LocalTransportThreadModelTest3.java | 5 +- .../socket/nio/AbstractNioChannelTest.java | 87 ++++++++++++++ 61 files changed, 726 insertions(+), 534 deletions(-) create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRegistration.java create mode 100644 transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueRegistration.java delete mode 100644 transport/src/main/java/io/netty/channel/DefaultEventLoop.java delete mode 100644 transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java create mode 100644 transport/src/main/java/io/netty/channel/local/LocalChannelUnsafe.java create mode 100644 transport/src/main/java/io/netty/channel/local/LocalEventLoop.java diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java index 69183e0843..958682c794 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java @@ -17,7 +17,7 @@ package io.netty.handler.codec.http2; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.handler.codec.http2.Http2Connection.Endpoint; import io.netty.handler.codec.http2.Http2Stream.State; import io.netty.util.concurrent.Future; @@ -61,7 +61,7 @@ public class DefaultHttp2ConnectionTest { private DefaultHttp2Connection server; private DefaultHttp2Connection client; - private static DefaultEventLoopGroup group; + private static LocalEventLoopGroup group; @Mock private Http2Connection.Listener clientListener; @@ -71,7 +71,7 @@ public class DefaultHttp2ConnectionTest { @BeforeClass public static void beforeClass() { - group = new DefaultEventLoopGroup(2); + group = new LocalEventLoopGroup(2); } @AfterClass diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java index f4a4b6c94b..af6907002d 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionRoundtripTest.java @@ -29,7 +29,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; @@ -1136,7 +1136,7 @@ public class Http2ConnectionRoundtripTest { final AtomicReference serverHandlerRef = new AtomicReference(); final CountDownLatch serverInitLatch = new CountDownLatch(1); - sb.group(new DefaultEventLoopGroup()); + sb.group(new LocalEventLoopGroup()); sb.channel(LocalServerChannel.class); sb.childHandler(new ChannelInitializer() { @Override @@ -1156,7 +1156,7 @@ public class Http2ConnectionRoundtripTest { } }); - cb.group(new DefaultEventLoopGroup()); + cb.group(new LocalEventLoopGroup()); cb.channel(LocalChannel.class); cb.handler(new ChannelInitializer() { @Override diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilderTest.java index 86ba1290e2..3ebbdcbb69 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexCodecBuilderTest.java @@ -27,10 +27,10 @@ import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; -import io.netty.channel.DefaultEventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; import org.junit.After; import org.junit.AfterClass; @@ -61,7 +61,7 @@ public class Http2MultiplexCodecBuilderTest { @BeforeClass public static void init() { - group = new DefaultEventLoop(); + group = new LocalEventLoopGroup(1); } @Before diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandlerTest.java index 0e4f3e9520..b5925df741 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandlerTest.java @@ -25,7 +25,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; @@ -508,7 +508,7 @@ public class HttpToHttp2ConnectionHandlerTest { sb = new ServerBootstrap(); cb = new Bootstrap(); - sb.group(new DefaultEventLoopGroup()); + sb.group(new LocalEventLoopGroup()); sb.channel(LocalServerChannel.class); sb.childHandler(new ChannelInitializer() { @Override @@ -525,7 +525,7 @@ public class HttpToHttp2ConnectionHandlerTest { } }); - cb.group(new DefaultEventLoopGroup()); + cb.group(new LocalEventLoopGroup()); cb.channel(LocalChannel.class); cb.handler(new ChannelInitializer() { @Override diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java index 8fb4ebfc67..ee9f3d832f 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/InboundHttp2ToHttpAdapterTest.java @@ -26,7 +26,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; @@ -667,7 +667,7 @@ public class InboundHttp2ToHttpAdapterTest { sb = new ServerBootstrap(); cb = new Bootstrap(); - sb.group(new DefaultEventLoopGroup()); + sb.group(new LocalEventLoopGroup()); sb.channel(LocalServerChannel.class); sb.childHandler(new ChannelInitializer() { @Override @@ -695,7 +695,7 @@ public class InboundHttp2ToHttpAdapterTest { } }); - cb.group(new DefaultEventLoopGroup()); + cb.group(new LocalEventLoopGroup()); cb.channel(LocalChannel.class); cb.handler(new ChannelInitializer() { @Override diff --git a/example/src/main/java/io/netty/example/localecho/LocalEcho.java b/example/src/main/java/io/netty/example/localecho/LocalEcho.java index 583ce02e4f..a2cffd3b0b 100644 --- a/example/src/main/java/io/netty/example/localecho/LocalEcho.java +++ b/example/src/main/java/io/netty/example/localecho/LocalEcho.java @@ -20,7 +20,7 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; @@ -40,7 +40,7 @@ public final class LocalEcho { // Address to bind on / connect to. final LocalAddress addr = new LocalAddress(PORT); - EventLoopGroup serverGroup = new DefaultEventLoopGroup(); + EventLoopGroup serverGroup = new LocalEventLoopGroup(); EventLoopGroup clientGroup = new NioEventLoopGroup(); // NIO event loops are also OK try { // Note that we can use any event loop to ensure certain local channels diff --git a/handler/src/test/java/io/netty/handler/ssl/CipherSuiteCanaryTest.java b/handler/src/test/java/io/netty/handler/ssl/CipherSuiteCanaryTest.java index 9c394ccf63..2b4323bc16 100644 --- a/handler/src/test/java/io/netty/handler/ssl/CipherSuiteCanaryTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/CipherSuiteCanaryTest.java @@ -24,7 +24,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.local.LocalAddress; @@ -75,7 +75,7 @@ public class CipherSuiteCanaryTest { @BeforeClass public static void init() throws Exception { - GROUP = new DefaultEventLoopGroup(); + GROUP = new LocalEventLoopGroup(); CERT = new SelfSignedCertificate(); } diff --git a/handler/src/test/java/io/netty/handler/ssl/SniClientJava8TestUtil.java b/handler/src/test/java/io/netty/handler/ssl/SniClientJava8TestUtil.java index 4db7c7b73a..a44878868b 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SniClientJava8TestUtil.java +++ b/handler/src/test/java/io/netty/handler/ssl/SniClientJava8TestUtil.java @@ -22,7 +22,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; @@ -79,7 +79,7 @@ final class SniClientJava8TestUtil { final String sniHost = "sni.netty.io"; SelfSignedCertificate cert = new SelfSignedCertificate(); LocalAddress address = new LocalAddress("test"); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); SslContext sslServerContext = null; SslContext sslClientContext = null; diff --git a/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java b/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java index 56ea815eb8..19298affd9 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SniClientTest.java @@ -20,7 +20,7 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; @@ -29,7 +29,6 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.util.Mapping; import io.netty.util.ReferenceCountUtil; -import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; import org.junit.Assert; @@ -96,7 +95,7 @@ public class SniClientTest { private static void testSniClient(SslProvider sslServerProvider, SslProvider sslClientProvider) throws Exception { String sniHostName = "sni.netty.io"; LocalAddress address = new LocalAddress("test"); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); SelfSignedCertificate cert = new SelfSignedCertificate(); SslContext sslServerContext = null; SslContext sslClientContext = null; diff --git a/handler/src/test/java/io/netty/handler/ssl/SniHandlerTest.java b/handler/src/test/java/io/netty/handler/ssl/SniHandlerTest.java index d3bf1f2428..976821b936 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SniHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SniHandlerTest.java @@ -27,7 +27,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.local.LocalAddress; @@ -418,7 +418,7 @@ public class SniHandlerTest { case OPENSSL_REFCNT: final String sniHost = "sni.netty.io"; LocalAddress address = new LocalAddress("testReplaceHandler-" + Math.random()); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); Channel sc = null; Channel cc = null; SslContext sslContext = null; diff --git a/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java b/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java index 772a15f9eb..19f4728ff1 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java @@ -32,7 +32,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelId; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.embedded.EmbeddedChannel; @@ -451,7 +451,7 @@ public class SslHandlerTest { final BlockingQueue events = new LinkedBlockingQueue(); Channel serverChannel = null; Channel clientChannel = null; - EventLoopGroup group = new DefaultEventLoopGroup(); + EventLoopGroup group = new LocalEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.group(group) @@ -614,7 +614,7 @@ public class SslHandlerTest { .trustManager(new SelfSignedCertificate().cert()) .build(); - EventLoopGroup group = new NioEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); Channel sc = null; Channel cc = null; try { diff --git a/handler/src/test/java/io/netty/handler/ssl/ocsp/OcspTest.java b/handler/src/test/java/io/netty/handler/ssl/ocsp/OcspTest.java index 161f52a99f..329218de7b 100644 --- a/handler/src/test/java/io/netty/handler/ssl/ocsp/OcspTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/ocsp/OcspTest.java @@ -26,7 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; @@ -344,7 +344,7 @@ public class OcspTest { .build(); try { - EventLoopGroup group = new DefaultEventLoopGroup(); + EventLoopGroup group = new LocalEventLoopGroup(); try { LocalAddress address = new LocalAddress("handshake-" + Math.random()); Channel server = newServer(group, address, serverSslContext, response, serverHandler); diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java index 3b8d2bb1a2..ecb438fad6 100644 --- a/microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java @@ -15,8 +15,8 @@ */ package io.netty.microbench.concurrent; -import io.netty.channel.DefaultEventLoop; -import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ScheduledFuture; @@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) public class ScheduledFutureTaskBenchmark extends AbstractMicrobenchmark { - static final EventLoop executor = new DefaultEventLoop(); + static final EventLoopGroup executor = new LocalEventLoopGroup(1); @State(Scope.Thread) public static class FuturesHolder { diff --git a/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultAuthoritativeDnsServerCacheTest.java b/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultAuthoritativeDnsServerCacheTest.java index ada0eaee26..00c25a16d7 100644 --- a/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultAuthoritativeDnsServerCacheTest.java +++ b/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultAuthoritativeDnsServerCacheTest.java @@ -15,7 +15,7 @@ */ package io.netty.resolver.dns; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -25,7 +25,6 @@ import org.junit.Test; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Comparator; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -39,7 +38,7 @@ public class DefaultAuthoritativeDnsServerCacheTest { InetAddress.getByAddress("ns1", new byte[] { 10, 0, 0, 1 }), 53); InetSocketAddress resolved2 = new InetSocketAddress( InetAddress.getByAddress("ns2", new byte[] { 10, 0, 0, 2 }), 53); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -91,7 +90,7 @@ public class DefaultAuthoritativeDnsServerCacheTest { InetAddress.getByAddress("ns1", new byte[] { 10, 0, 0, 1 }), 53); InetSocketAddress resolved2 = new InetSocketAddress( InetAddress.getByAddress("ns2", new byte[] { 10, 0, 0, 2 }), 53); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -115,7 +114,7 @@ public class DefaultAuthoritativeDnsServerCacheTest { InetAddress.getByAddress("ns2", new byte[] { 10, 0, 0, 2 }), 53); InetSocketAddress resolved2 = new InetSocketAddress( InetAddress.getByAddress("ns1", new byte[] { 10, 0, 0, 1 }), 53); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -153,7 +152,7 @@ public class DefaultAuthoritativeDnsServerCacheTest { InetSocketAddress unresolved = InetSocketAddress.createUnresolved("ns1", 53); InetSocketAddress resolved = new InetSocketAddress( InetAddress.getByAddress("ns2", new byte[] { 10, 0, 0, 2 }), 53); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); diff --git a/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultDnsCacheTest.java b/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultDnsCacheTest.java index d3b2384c71..0898df0cb0 100644 --- a/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultDnsCacheTest.java +++ b/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultDnsCacheTest.java @@ -15,7 +15,7 @@ */ package io.netty.resolver.dns; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; @@ -40,7 +40,7 @@ public class DefaultDnsCacheTest { public void testExpire() throws Throwable { InetAddress addr1 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 }); InetAddress addr2 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 2 }); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -103,7 +103,7 @@ public class DefaultDnsCacheTest { public void testAddMultipleAddressesForSameHostname() throws Exception { InetAddress addr1 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 }); InetAddress addr2 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 2 }); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -123,7 +123,7 @@ public class DefaultDnsCacheTest { @Test public void testAddSameAddressForSameHostname() throws Exception { InetAddress addr1 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 }); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -148,7 +148,7 @@ public class DefaultDnsCacheTest { public void testCacheFailed() throws Exception { InetAddress addr1 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 }); InetAddress addr2 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 2 }); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -177,7 +177,7 @@ public class DefaultDnsCacheTest { public void testDotHandling() throws Exception { InetAddress addr1 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 }); InetAddress addr2 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 2 }); - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); diff --git a/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultDnsCnameCacheTest.java b/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultDnsCnameCacheTest.java index a08702f5f2..82965d6d24 100644 --- a/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultDnsCnameCacheTest.java +++ b/resolver-dns/src/test/java/io/netty/resolver/dns/DefaultDnsCnameCacheTest.java @@ -15,7 +15,7 @@ */ package io.netty.resolver.dns; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import org.junit.Test; @@ -29,7 +29,7 @@ public class DefaultDnsCnameCacheTest { @Test public void testExpire() throws Throwable { - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -63,7 +63,7 @@ public class DefaultDnsCnameCacheTest { } private static void testExpireWithTTL0(int days) { - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -77,7 +77,7 @@ public class DefaultDnsCnameCacheTest { @Test public void testMultipleCnamesForSameHostname() throws Exception { - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -93,7 +93,7 @@ public class DefaultDnsCnameCacheTest { @Test public void testAddSameCnameForSameHostname() throws Exception { - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); @@ -109,7 +109,7 @@ public class DefaultDnsCnameCacheTest { @Test public void testClear() throws Exception { - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { EventLoop loop = group.next(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 41f9e72237..cb54233038 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -68,6 +68,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann private ChannelPromise connectPromise; private ScheduledFuture connectTimeoutFuture; private SocketAddress requestedRemoteAddress; + private EpollRegistration registration; private volatile SocketAddress local; private volatile SocketAddress remote; @@ -126,6 +127,11 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } } + protected EpollRegistration registration() { + assert registration != null; + return registration; + } + boolean isFlagSet(int flag) { return (flags & flag) != 0; } @@ -199,19 +205,23 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann doClose(); } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof EpollEventLoop; - } - @Override public boolean isOpen() { return socket.isOpen(); } - @Override - protected void doDeregister() throws Exception { - ((EpollEventLoop) eventLoop()).remove(this); + void register0(EpollRegistration registration) throws Exception { + // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop + // make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the + // new EventLoop. + epollInReadyRunnablePending = false; + this.registration = registration; + } + + void deregister0() throws Exception { + if (registration != null) { + registration.remove(); + } } @Override @@ -268,20 +278,11 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } private void modifyEvents() throws IOException { - if (isOpen() && isRegistered()) { - ((EpollEventLoop) eventLoop()).modify(this); + if (isOpen() && isRegistered() && registration != null) { + registration.update(); } } - @Override - protected void doRegister() throws Exception { - // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop - // make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the - // new EventLoop. - epollInReadyRunnablePending = false; - ((EpollEventLoop) eventLoop()).add(this); - } - @Override protected abstract AbstractEpollUnsafe newUnsafe(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index 31b01a19a8..d5cbd087f3 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -58,11 +58,6 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im return METADATA; } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof EpollEventLoop; - } - @Override protected InetSocketAddress remoteAddress0() { return null; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 4d781185e1..4c5a3df982 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -513,7 +513,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im */ private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception { final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite(); - IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray(); + IovArray array = registration().cleanIovArray(); array.maxBytes(maxBytesPerGatheringWrite); in.forEachFlushedMessage(array); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index c21b0459a3..67f863b6c0 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -271,7 +271,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements try { // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+ if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) { - NativeDatagramPacketArray array = ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray(); + NativeDatagramPacketArray array = registration().cleanDatagramPacketArray(); in.forEachFlushedMessage(array); int cnt = array.count(); @@ -349,7 +349,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements remoteAddress.getAddress(), remoteAddress.getPort()); } } else if (data.nioBufferCount() > 1) { - IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray(); + IovArray array = registration().cleanIovArray(); array.add(data); int cnt = array.count(); assert cnt != 0; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 3420d67bc8..65f97e00e4 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -15,6 +15,7 @@ */ package io.netty.channel.epoll; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.SelectStrategy; @@ -28,6 +29,7 @@ import io.netty.util.collection.IntObjectMap; import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -79,6 +81,49 @@ class EpollEventLoop extends SingleThreadEventLoop { // See http://man7.org/linux/man-pages/man2/timerfd_create.2.html. private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999; + private static AbstractEpollChannel cast(Channel channel) { + if (channel instanceof AbstractEpollChannel) { + return (AbstractEpollChannel) channel; + } + throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(channel) + " not supported"); + } + + private final Unsafe unsafe = new Unsafe() { + @Override + public void register(Channel channel) throws Exception { + assert inEventLoop(); + final AbstractEpollChannel epollChannel = cast(channel); + epollChannel.register0(new EpollRegistration() { + @Override + public void update() throws IOException { + EpollEventLoop.this.modify(epollChannel); + } + + @Override + public void remove() throws IOException { + EpollEventLoop.this.remove(epollChannel); + } + + @Override + public IovArray cleanIovArray() { + return EpollEventLoop.this.cleanIovArray(); + } + + @Override + public NativeDatagramPacketArray cleanDatagramPacketArray() { + return EpollEventLoop.this.cleanDatagramPacketArray(); + } + }); + add(epollChannel); + } + + @Override + public void deregister(Channel channel) throws Exception { + assert inEventLoop(); + cast(channel).deregister0(); + } + }; + EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); @@ -160,6 +205,11 @@ class EpollEventLoop extends SingleThreadEventLoop { return datagramPacketArray; } + @Override + public Unsafe unsafe() { + return unsafe; + } + @Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRegistration.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRegistration.java new file mode 100644 index 0000000000..2e695885c6 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRegistration.java @@ -0,0 +1,46 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.unix.IovArray; + +import java.io.IOException; + +/** + * Registration with an {@link EpollEventLoop}. + */ +interface EpollRegistration { + + /** + * Update the registration as some flags did change. + */ + void update() throws IOException; + + /** + * Remove the registration. No more IO will be handled for it. + */ + void remove() throws IOException; + + /** + * Returns an {@link IovArray} that can be used for {@code writev}. + */ + IovArray cleanIovArray(); + + /** + * Returns a {@link NativeDatagramPacketArray} that can used for {@code sendmmsg}. + */ + NativeDatagramPacketArray cleanDatagramPacketArray(); +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index d8ea2db414..1df86beca4 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -61,11 +61,6 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i config = new EpollServerSocketChannelConfig(this); } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof EpollEventLoop; - } - @Override protected void doBind(SocketAddress localAddress) throws Exception { super.doBind(localAddress); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 1bb6ac3fea..8a51be97c1 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -122,7 +122,7 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme // because we try to read or write until the actual close happens which may be later due // SO_LINGER handling. // See https://github.com/netty/netty/issues/4449 - ((EpollEventLoop) eventLoop()).remove(EpollSocketChannel.this); + doDeregister(); return GlobalEventExecutor.INSTANCE; } } catch (Throwable ignore) { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index a7f737687e..5828e334ec 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -63,12 +63,14 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan private ChannelPromise connectPromise; private ScheduledFuture connectTimeoutFuture; private SocketAddress requestedRemoteAddress; + private KQueueRegistration registration; final BsdSocket socket; private boolean readFilterEnabled; private boolean writeFilterEnabled; boolean readReadyRunnablePending; boolean inputClosedSeenErrorOnRead; + protected volatile boolean active; private volatile SocketAddress local; private volatile SocketAddress remote; @@ -103,6 +105,11 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan } } + protected KQueueRegistration registration() { + assert registration != null; + return registration; + } + @Override public final FileDescriptor fd() { return socket; @@ -160,26 +167,11 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan doClose(); } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof KQueueEventLoop; - } - @Override public boolean isOpen() { return socket.isOpen(); } - @Override - protected void doDeregister() throws Exception { - // Make sure we unregister our filters from kqueue! - readFilter(false); - writeFilter(false); - evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0); - - ((KQueueEventLoop) eventLoop()).remove(this); - } - @Override protected final void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called @@ -198,23 +190,31 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan } } - @Override - protected void doRegister() throws Exception { + void register0(KQueueRegistration registration) { + this.registration = registration; // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop // make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the // new EventLoop. readReadyRunnablePending = false; - ((KQueueEventLoop) eventLoop()).add(this); - // Add the write event first so we get notified of connection refused on the client side! if (writeFilterEnabled) { - evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE); + evSet0(registration, Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE); } if (readFilterEnabled) { - evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE); + evSet0(registration, Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE); + } + evSet0(registration, Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP); + } + + void deregister0() throws IOException { + // Make sure we unregister our filters from kqueue! + readFilter(false); + writeFilter(false); + if (registration != null) { + evSet0(registration, Native.EVFILT_SOCK, Native.EV_DELETE, 0); + registration = null; } - evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP); } @Override @@ -360,16 +360,16 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan private void evSet(short filter, short flags) { if (isOpen() && isRegistered()) { - evSet0(filter, flags); + evSet0(registration, filter, flags); } } - private void evSet0(short filter, short flags) { - evSet0(filter, flags, 0); + private void evSet0(KQueueRegistration registration, short filter, short flags) { + evSet0(registration, filter, flags, 0); } - private void evSet0(short filter, short flags, int fflags) { - ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags); + private void evSet0(KQueueRegistration registration, short filter, short flags, int fflags) { + registration.evSet(filter, flags, fflags); } abstract class AbstractKQueueUnsafe extends AbstractUnsafe { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java index 25389ffbd2..acfb055146 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java @@ -53,11 +53,6 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel return METADATA; } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof KQueueEventLoop; - } - @Override protected InetSocketAddress remoteAddress0() { return null; diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java index 094f13f756..37aa526c9f 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java @@ -344,7 +344,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel */ private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception { final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite(); - IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray(); + IovArray array = registration().cleanArray(); array.maxBytes(maxBytesPerGatheringWrite); in.forEachFlushedMessage(array); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java index 1e6b30b7c6..09476a5f4b 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java @@ -321,7 +321,7 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement remoteAddress.getAddress(), remoteAddress.getPort()); } } else if (data.nioBufferCount() > 1) { - IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray(); + IovArray array = registration().cleanArray(); array.add(data); int cnt = array.count(); assert cnt != 0; diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index 25c50a4f05..a4eaec4e8e 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -15,6 +15,7 @@ */ package io.netty.channel.kqueue; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.SelectStrategy; @@ -28,6 +29,7 @@ import io.netty.util.collection.IntObjectMap; import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -70,6 +72,43 @@ final class KQueueEventLoop extends SingleThreadEventLoop { private volatile int wakenUp; private volatile int ioRatio = 50; + private static AbstractKQueueChannel cast(Channel channel) { + if (channel instanceof AbstractKQueueChannel) { + return (AbstractKQueueChannel) channel; + } + throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(channel) + " not supported"); + } + + private final Unsafe unsafe = new Unsafe() { + @Override + public void register(Channel channel) { + assert inEventLoop(); + final AbstractKQueueChannel kQueueChannel = cast(channel); + final int id = kQueueChannel.fd().intValue(); + channels.put(id, kQueueChannel); + + kQueueChannel.register0(new KQueueRegistration() { + @Override + public void evSet(short filter, short flags, int fflags) { + KQueueEventLoop.this.evSet(kQueueChannel, filter, flags, fflags); + } + + @Override + public IovArray cleanArray() { + return KQueueEventLoop.this.cleanArray(); + } + }); + } + + @Override + public void deregister(Channel channel) throws Exception { + assert inEventLoop(); + AbstractKQueueChannel kQueueChannel = cast(channel); + channels.remove(kQueueChannel.fd().intValue()); + kQueueChannel.deregister0(); + } + }; + KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); @@ -90,24 +129,19 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } } - void add(AbstractKQueueChannel ch) { - assert inEventLoop(); - channels.put(ch.fd().intValue(), ch); + @Override + public Unsafe unsafe() { + return unsafe; } - void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) { + private void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) { changeList.evSet(ch, filter, flags, fflags); } - void remove(AbstractKQueueChannel ch) { - assert inEventLoop(); - channels.remove(ch.fd().intValue()); - } - /** * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}. */ - IovArray cleanArray() { + private IovArray cleanArray() { iovArray.clear(); return iovArray; } @@ -167,7 +201,7 @@ final class KQueueEventLoop extends SingleThreadEventLoop { // This may happen if the channel has already been closed, and it will be removed from kqueue anyways. // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and // thus removed from kqueue FD. - logger.warn("events[{}]=[{}, {}] had no channel!", i, eventList.fd(i), filter); + logger.warn("events[{}]=[{}, {}] had no channel!", i, fd, filter); continue; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueRegistration.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueRegistration.java new file mode 100644 index 0000000000..09dc033b30 --- /dev/null +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueRegistration.java @@ -0,0 +1,35 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.kqueue; + +import io.netty.channel.unix.IovArray; + + +/** + * Registration with an {@link KQueueEventLoop}. + */ +interface KQueueRegistration { + + /** + * Update the event-set for the registration. + */ + void evSet(short filter, short flags, int fflags); + + /** + * Returns an {@link IovArray} that can be used for {@code writev}. + */ + IovArray cleanArray(); +} diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueServerSocketChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueServerSocketChannel.java index 38a01460b2..14091fbacf 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueServerSocketChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueServerSocketChannel.java @@ -52,11 +52,6 @@ public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel config = new KQueueServerSocketChannelConfig(this); } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof KQueueEventLoop; - } - @Override protected void doBind(SocketAddress localAddress) throws Exception { super.doBind(localAddress); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java index 64b3847550..edd7a70b59 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java @@ -80,7 +80,7 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple // because we try to read or write until the actual close happens which may be later due // SO_LINGER handling. // See https://github.com/netty/netty/issues/4449 - ((KQueueEventLoop) eventLoop()).remove(KQueueSocketChannel.this); + doDeregister(); return GlobalEventExecutor.INSTANCE; } } catch (Throwable ignore) { diff --git a/transport/src/main/java/io/netty/bootstrap/FailedChannel.java b/transport/src/main/java/io/netty/bootstrap/FailedChannel.java index ee5b8fbc42..e1dcedf9eb 100644 --- a/transport/src/main/java/io/netty/bootstrap/FailedChannel.java +++ b/transport/src/main/java/io/netty/bootstrap/FailedChannel.java @@ -38,11 +38,6 @@ final class FailedChannel extends AbstractChannel { return new FailedChannelUnsafe(); } - @Override - protected boolean isCompatible(EventLoop loop) { - return true; - } - @Override protected SocketAddress localAddress0() { return null; diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index e6c97c95d8..277f8e2213 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -102,10 +102,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } private EventLoop validateEventLoop(EventLoop eventLoop) { - if (!isCompatible(ObjectUtil.checkNotNull(eventLoop, "eventLoop"))) { - throw new IllegalArgumentException("incompatible event loop type: " + eventLoop.getClass().getName()); - } - return eventLoop; + return ObjectUtil.checkNotNull(eventLoop, "eventLoop"); } @Override @@ -1035,11 +1032,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - /** - * Return {@code true} if the given {@link EventLoop} is compatible with this instance. - */ - protected abstract boolean isCompatible(EventLoop loop); - /** * Returns the {@link SocketAddress} which is bound locally. */ @@ -1056,7 +1048,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha * Sub-classes may override this method */ protected void doRegister() throws Exception { - // NOOP + eventLoop().unsafe().register(this); } /** @@ -1089,7 +1081,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha * Sub-classes may override this method */ protected void doDeregister() throws Exception { - // NOOP + eventLoop().unsafe().deregister(this); } /** diff --git a/transport/src/main/java/io/netty/channel/DefaultEventLoop.java b/transport/src/main/java/io/netty/channel/DefaultEventLoop.java deleted file mode 100644 index 53d2e3b12b..0000000000 --- a/transport/src/main/java/io/netty/channel/DefaultEventLoop.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -import io.netty.util.concurrent.DefaultThreadFactory; - -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; - -public class DefaultEventLoop extends SingleThreadEventLoop { - - public DefaultEventLoop() { - this((EventLoopGroup) null); - } - - public DefaultEventLoop(ThreadFactory threadFactory) { - this(null, threadFactory); - } - - public DefaultEventLoop(Executor executor) { - this(null, executor); - } - - public DefaultEventLoop(EventLoopGroup parent) { - this(parent, new DefaultThreadFactory(DefaultEventLoop.class)); - } - - public DefaultEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) { - super(parent, threadFactory, true); - } - - public DefaultEventLoop(EventLoopGroup parent, Executor executor) { - super(parent, executor, true); - } - - @Override - protected void run() { - for (;;) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - updateLastExecutionTime(); - } - - if (confirmShutdown()) { - break; - } - } - } -} diff --git a/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java b/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java deleted file mode 100644 index ee5eeea641..0000000000 --- a/transport/src/main/java/io/netty/channel/DefaultEventLoopGroup.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; - -/** - * {@link MultithreadEventLoopGroup} which must be used for the local transport. - */ -public class DefaultEventLoopGroup extends MultithreadEventLoopGroup { - - /** - * Create a new instance with the default number of threads. - */ - public DefaultEventLoopGroup() { - this(0); - } - - /** - * Create a new instance - * - * @param nThreads the number of threads to use - */ - public DefaultEventLoopGroup(int nThreads) { - this(nThreads, (ThreadFactory) null); - } - - /** - * Create a new instance - * - * @param nThreads the number of threads to use - * @param threadFactory the {@link ThreadFactory} or {@code null} to use the default - */ - public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) { - super(nThreads, threadFactory); - } - - /** - * Create a new instance - * - * @param nThreads the number of threads to use - * @param executor the Executor to use, or {@code null} if the default should be used. - */ - public DefaultEventLoopGroup(int nThreads, Executor executor) { - super(nThreads, executor); - } - - @Override - protected EventLoop newChild(Executor executor, Object... args) throws Exception { - return new DefaultEventLoop(this, executor); - } -} diff --git a/transport/src/main/java/io/netty/channel/EventLoop.java b/transport/src/main/java/io/netty/channel/EventLoop.java index e58b9f22ca..cfd2b907ec 100644 --- a/transport/src/main/java/io/netty/channel/EventLoop.java +++ b/transport/src/main/java/io/netty/channel/EventLoop.java @@ -27,4 +27,25 @@ import io.netty.util.concurrent.OrderedEventExecutor; public interface EventLoop extends OrderedEventExecutor, EventLoopGroup { @Override EventLoopGroup parent(); + + /** + * Returns an internal-use-only object that provides unsafe operations. + */ + Unsafe unsafe(); + + /** + * Unsafe operations that should never be called from user-code. These methods + * are only provided to implement the actual transport, and must be invoked from the {@link EventLoop} itself. + */ + interface Unsafe { + /** + * Register the {@link Channel} to the {@link EventLoop}. + */ + void register(Channel channel) throws Exception; + + /** + * Deregister the {@link Channel} from the {@link EventLoop}. + */ + void deregister(Channel channel) throws Exception; + } } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 36fce70680..72f048a5b3 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -18,7 +18,6 @@ package io.netty.channel; import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.concurrent.RejectedExecutionHandlers; import io.netty.util.concurrent.SingleThreadEventExecutor; -import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.SystemPropertyUtil; import java.util.concurrent.Executor; diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 1118839545..8119e6afbd 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -673,11 +673,6 @@ public class EmbeddedChannel extends AbstractChannel { } } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof EmbeddedEventLoop; - } - @Override protected SocketAddress localAddress0() { return isActive()? LOCAL_ADDRESS : null; @@ -688,8 +683,7 @@ public class EmbeddedChannel extends AbstractChannel { return isActive()? REMOTE_ADDRESS : null; } - @Override - protected void doRegister() throws Exception { + void setActive() { state = State.ACTIVE; } diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index d8017ecdf7..b0534a43e8 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -15,10 +15,12 @@ */ package io.netty.channel.embedded; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.AbstractScheduledEventExecutor; import io.netty.util.concurrent.Future; +import io.netty.util.internal.StringUtil; import java.util.ArrayDeque; import java.util.Queue; @@ -28,6 +30,31 @@ final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements private final Queue tasks = new ArrayDeque(2); + private static EmbeddedChannel cast(Channel channel) { + if (channel instanceof EmbeddedChannel) { + return (EmbeddedChannel) channel; + } + throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(channel) + " not supported"); + } + + private final Unsafe unsafe = new Unsafe() { + @Override + public void register(Channel channel) { + assert inEventLoop(); + cast(channel).setActive(); + } + + @Override + public void deregister(Channel channel) { + assert inEventLoop(); + } + }; + + @Override + public Unsafe unsafe() { + return unsafe; + } + @Override public EventLoopGroup parent() { return (EventLoopGroup) super.parent(); diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index be216973fa..63f6327af6 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -26,10 +26,8 @@ import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.PreferHeapByteBufAllocator; import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.SingleThreadEventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.ThrowableUtil; @@ -144,11 +142,6 @@ public class LocalChannel extends AbstractChannel { return new LocalUnsafe(); } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof SingleThreadEventLoop; - } - @Override protected SocketAddress localAddress0() { return localAddress; @@ -159,43 +152,6 @@ public class LocalChannel extends AbstractChannel { return remoteAddress; } - @Override - protected void doRegister() throws Exception { - // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel. - // This is needed as a peer may not be null also if a LocalChannel was connected before and - // deregistered / registered later again. - // - // See https://github.com/netty/netty/issues/2400 - if (peer != null && parent() != null) { - // Store the peer in a local variable as it may be set to null if doClose() is called. - // See https://github.com/netty/netty/issues/2144 - final LocalChannel peer = this.peer; - state = State.CONNECTED; - - peer.remoteAddress = parent() == null ? null : parent().localAddress(); - peer.state = State.CONNECTED; - - // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true. - // This ensures that if both channels are on the same event loop, the peer's channelActive - // event is triggered *after* this channel's channelRegistered event, so that this channel's - // pipeline is fully initialized by ChannelInitializer before any channelRead events. - peer.eventLoop().execute(new Runnable() { - @Override - public void run() { - ChannelPromise promise = peer.connectPromise; - - // Only trigger fireChannelActive() if the promise was not null and was not completed yet. - // connectPromise may be set to null if doClose() was called in the meantime. - if (promise != null && promise.trySuccess()) { - peer.pipeline().fireChannelActive(); - peer.readIfIsAutoRead(); - } - } - }); - } - ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook); - } - @Override protected void doBind(SocketAddress localAddress) throws Exception { this.localAddress = @@ -294,12 +250,6 @@ public class LocalChannel extends AbstractChannel { } } - @Override - protected void doDeregister() throws Exception { - // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop - ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook); - } - private void readInbound() { RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle(); handle.reset(config()); @@ -456,7 +406,7 @@ public class LocalChannel extends AbstractChannel { } } - private class LocalUnsafe extends AbstractUnsafe { + private class LocalUnsafe extends AbstractUnsafe implements LocalChannelUnsafe { @Override public void connect(final SocketAddress remoteAddress, @@ -506,5 +456,48 @@ public class LocalChannel extends AbstractChannel { LocalServerChannel serverChannel = (LocalServerChannel) boundChannel; peer = serverChannel.serve(LocalChannel.this); } + + @Override + public void register0(LocalEventLoop eventLoop) { + // Check if both peer and parent are non-null because this channel was created by a LocalServerChannel. + // This is needed as a peer may not be null also if a LocalChannel was connected before and + // deregistered / registered later again. + // + // See https://github.com/netty/netty/issues/2400 + if (peer != null && parent() != null) { + // Store the peer in a local variable as it may be set to null if doClose() is called. + // See https://github.com/netty/netty/issues/2144 + final LocalChannel peer = LocalChannel.this.peer; + state = State.CONNECTED; + + peer.remoteAddress = parent() == null ? null : parent().localAddress(); + peer.state = State.CONNECTED; + + // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true. + // This ensures that if both channels are on the same event loop, the peer's channelActive + // event is triggered *after* this channel's channelRegistered event, so that this channel's + // pipeline is fully initialized by ChannelInitializer before any channelRead events. + peer.eventLoop().execute(new Runnable() { + @Override + public void run() { + ChannelPromise promise = peer.connectPromise; + + // Only trigger fireChannelActive() if the promise was not null and was not completed yet. + // connectPromise may be set to null if doClose() was called in the meantime. + if (promise != null && promise.trySuccess()) { + peer.pipeline().fireChannelActive(); + peer.readIfIsAutoRead(); + } + } + }); + } + eventLoop.addShutdownHook(shutdownHook); + } + + @Override + public void deregister0(LocalEventLoop eventLoop) { + // Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop + eventLoop.removeShutdownHook(shutdownHook); + } } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannelUnsafe.java b/transport/src/main/java/io/netty/channel/local/LocalChannelUnsafe.java new file mode 100644 index 0000000000..80381159c2 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/local/LocalChannelUnsafe.java @@ -0,0 +1,23 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.local; + +import io.netty.channel.Channel; + +interface LocalChannelUnsafe extends Channel.Unsafe { + void register0(LocalEventLoop eventLoop); + void deregister0(LocalEventLoop eventLoop); +} diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java new file mode 100644 index 0000000000..642d44e22f --- /dev/null +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoop.java @@ -0,0 +1,72 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.local; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SingleThreadEventLoop; +import io.netty.util.internal.StringUtil; + +import java.util.concurrent.Executor; + +class LocalEventLoop extends SingleThreadEventLoop { + + private static LocalChannelUnsafe cast(Channel channel) { + Channel.Unsafe unsafe = channel.unsafe(); + if (unsafe instanceof LocalChannelUnsafe) { + return (LocalChannelUnsafe) unsafe; + } + throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(channel) + " not supported"); + } + + private final Unsafe unsafe = new Unsafe() { + @Override + public void register(Channel channel) { + assert inEventLoop(); + cast(channel).register0(LocalEventLoop.this); + } + + @Override + public void deregister(Channel channel) { + assert inEventLoop(); + cast(channel).deregister0(LocalEventLoop.this); + } + }; + + LocalEventLoop(EventLoopGroup parent, Executor executor) { + super(parent, executor, true); + } + + @Override + public Unsafe unsafe() { + return unsafe; + } + + @Override + protected void run() { + for (;;) { + Runnable task = takeTask(); + if (task != null) { + task.run(); + updateLastExecutionTime(); + } + + if (confirmShutdown()) { + break; + } + } + } +} diff --git a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java index 2bd3ff611e..7497d3da59 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/local/LocalEventLoopGroup.java @@ -15,20 +15,23 @@ */ package io.netty.channel.local; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoop; +import io.netty.channel.MultithreadEventLoopGroup; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** - * @deprecated Use {@link DefaultEventLoopGroup} instead. + * {@link MultithreadEventLoopGroup} which must be used for the local transport. */ -@Deprecated -public class LocalEventLoopGroup extends DefaultEventLoopGroup { +public class LocalEventLoopGroup extends MultithreadEventLoopGroup { /** * Create a new instance with the default number of threads. */ - public LocalEventLoopGroup() { } + public LocalEventLoopGroup() { + this(0); + } /** * Create a new instance @@ -36,7 +39,7 @@ public class LocalEventLoopGroup extends DefaultEventLoopGroup { * @param nThreads the number of threads to use */ public LocalEventLoopGroup(int nThreads) { - super(nThreads); + this(nThreads, (ThreadFactory) null); } /** @@ -48,4 +51,20 @@ public class LocalEventLoopGroup extends DefaultEventLoopGroup { public LocalEventLoopGroup(int nThreads, ThreadFactory threadFactory) { super(nThreads, threadFactory); } + + /** + * Create a new instance + * + * @param nThreads the number of threads to use + * @param executor the Executor to use, or {@code null} if the default should be used. + */ + public LocalEventLoopGroup(int nThreads, Executor executor) { + super(nThreads, executor); + } + + @Override + protected EventLoop newChild(Executor executor, Object... args) throws Exception { + assert args == null || args.length == 0; + return new LocalEventLoop(this, executor); + } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index 114c705469..277c7d49e0 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -18,14 +18,13 @@ package io.netty.channel.local; import io.netty.channel.AbstractServerChannel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.PreferHeapByteBufAllocator; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.ServerChannel; -import io.netty.channel.SingleThreadEventLoop; -import io.netty.util.concurrent.SingleThreadEventExecutor; import java.net.SocketAddress; import java.util.ArrayDeque; @@ -79,21 +78,11 @@ public class LocalServerChannel extends AbstractServerChannel { return state == 1; } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof SingleThreadEventLoop; - } - @Override protected SocketAddress localAddress0() { return localAddress; } - @Override - protected void doRegister() throws Exception { - ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook); - } - @Override protected void doBind(SocketAddress localAddress) throws Exception { this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress); @@ -112,11 +101,6 @@ public class LocalServerChannel extends AbstractServerChannel { } } - @Override - protected void doDeregister() throws Exception { - ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook); - } - @Override protected void doBeginRead() throws Exception { if (acceptInProgress) { @@ -179,4 +163,26 @@ public class LocalServerChannel extends AbstractServerChannel { readInbound(); } } + + @Override + protected AbstractUnsafe newUnsafe() { + return new DefaultServerUnsafe(); + } + + private final class DefaultServerUnsafe extends AbstractUnsafe implements LocalChannelUnsafe { + @Override + public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + safeSetFailure(promise, new UnsupportedOperationException()); + } + + @Override + public void register0(LocalEventLoop eventLoop) { + eventLoop.addShutdownHook(shutdownHook); + } + + @Override + public void deregister0(LocalEventLoop eventLoop) { + eventLoop.removeShutdownHook(shutdownHook); + } + } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index f7ee743521..530f527468 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -35,7 +35,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.net.SocketAddress; -import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.SelectableChannel; @@ -115,11 +114,6 @@ public abstract class AbstractNioChannel extends AbstractChannel { return ch; } - @Override - public NioEventLoop eventLoop() { - return (NioEventLoop) super.eventLoop(); - } - /** * Return the current {@link SelectionKey} */ @@ -375,36 +369,14 @@ public abstract class AbstractNioChannel extends AbstractChannel { } } - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof NioEventLoop; - } - @Override protected void doRegister() throws Exception { - boolean selected = false; - for (;;) { - try { - selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); - return; - } catch (CancelledKeyException e) { - if (!selected) { - // Force the Selector to select now as the "canceled" SelectionKey may still be - // cached and not removed because no Select.select(..) operation was called yet. - eventLoop().selectNow(); - selected = true; - } else { - // We forced a select operation on the selector before but the SelectionKey is still cached - // for whatever reason. JDK bug ? - throw e; - } - } - } + eventLoop().unsafe().register(this); } @Override protected void doDeregister() throws Exception { - eventLoop().cancel(selectionKey()); + eventLoop().unsafe().deregister(this); } @Override diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index 7c0c6ae23a..d43a2938eb 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -17,7 +17,6 @@ package io.netty.channel.nio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; -import io.netty.channel.EventLoop; import io.netty.channel.EventLoopException; import io.netty.channel.SelectStrategy; import io.netty.channel.SingleThreadEventLoop; @@ -25,6 +24,7 @@ import io.netty.util.IntSupplier; import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.ReflectionUtil; +import io.netty.util.internal.StringUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -116,6 +116,46 @@ public final class NioEventLoop extends SingleThreadEventLoop { private final SelectorProvider provider; + private static AbstractNioChannel cast(Channel channel) { + if (channel instanceof AbstractNioChannel) { + return (AbstractNioChannel) channel; + } + throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(channel) + " not supported"); + } + + private final Unsafe unsafe = new Unsafe() { + @Override + public void register(Channel channel) throws Exception { + assert inEventLoop(); + AbstractNioChannel nioChannel = cast(channel); + boolean selected = false; + for (;;) { + try { + nioChannel.selectionKey = nioChannel.javaChannel().register(unwrappedSelector(), 0, nioChannel); + return; + } catch (CancelledKeyException e) { + if (!selected) { + // Force the Selector to select now as the "canceled" SelectionKey may still be + // cached and not removed because no Select.select(..) operation was called yet. + selectNow(); + selected = true; + } else { + // We forced a select operation on the selector before but the SelectionKey is still cached + // for whatever reason. JDK bug ? + throw e; + } + } + } + } + + @Override + public void deregister(Channel channel) { + assert inEventLoop(); + AbstractNioChannel nioChannel = cast(channel); + cancel(nioChannel.selectionKey()); + } + }; + /** * Boolean that controls determines if a blocked Selector.select should * break out of its selection process. In our case we use a timeout for @@ -263,6 +303,11 @@ public final class NioEventLoop extends SingleThreadEventLoop { return provider; } + @Override + public Unsafe unsafe() { + return unsafe; + } + @Override protected Queue newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() diff --git a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java index ad4e7186c8..6b4f54cdd3 100644 --- a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java +++ b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java @@ -26,7 +26,7 @@ import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; @@ -59,8 +59,8 @@ import static org.junit.Assert.*; public class BootstrapTest { - private static final EventLoopGroup groupA = new DefaultEventLoopGroup(1); - private static final EventLoopGroup groupB = new DefaultEventLoopGroup(1); + private static final EventLoopGroup groupA = new LocalEventLoopGroup(1); + private static final EventLoopGroup groupB = new LocalEventLoopGroup(1); private static final ChannelInboundHandler dummyHandler = new DummyHandler(); @AfterClass @@ -145,7 +145,7 @@ public class BootstrapTest { @Test public void testLateRegisterSuccess() throws Exception { - DefaultEventLoopGroup group = new DefaultEventLoopGroup(1); + LocalEventLoopGroup group = new LocalEventLoopGroup(1); try { LateRegisterHandler registerHandler = new LateRegisterHandler(); ServerBootstrap bootstrap = new ServerBootstrap(); @@ -175,7 +175,7 @@ public class BootstrapTest { @Test public void testLateRegisterSuccessBindFailed() throws Exception { - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); LateRegisterHandler registerHandler = new LateRegisterHandler(); try { ServerBootstrap bootstrap = new ServerBootstrap(); @@ -226,7 +226,7 @@ public class BootstrapTest { @Test(expected = ConnectException.class, timeout = 10000) public void testLateRegistrationConnect() throws Exception { - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); LateRegisterHandler registerHandler = new LateRegisterHandler(); try { final Bootstrap bootstrapA = new Bootstrap(); diff --git a/transport/src/test/java/io/netty/bootstrap/ServerBootstrapTest.java b/transport/src/test/java/io/netty/bootstrap/ServerBootstrapTest.java index 5cae2a4a6f..458cee76bd 100644 --- a/transport/src/test/java/io/netty/bootstrap/ServerBootstrapTest.java +++ b/transport/src/test/java/io/netty/bootstrap/ServerBootstrapTest.java @@ -21,11 +21,10 @@ import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; -import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; -import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; import org.junit.Test; @@ -97,7 +96,7 @@ public class ServerBootstrapTest { } }; - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); Channel sch = null; Channel cch = null; try { diff --git a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java index fbfd050169..259d55b2b9 100644 --- a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java @@ -31,6 +31,7 @@ public class AbstractChannelTest { EventLoop eventLoop = mock(EventLoop.class); // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); + when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); TestChannel channel = new TestChannel(eventLoop); ChannelInboundHandler handler = mock(ChannelInboundHandler.class); @@ -48,6 +49,7 @@ public class AbstractChannelTest { final EventLoop eventLoop = mock(EventLoop.class); // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); + when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); doAnswer(new Answer() { @Override @@ -126,11 +128,6 @@ public class AbstractChannelTest { return new TestUnsafe(); } - @Override - protected boolean isCompatible(EventLoop loop) { - return true; - } - @Override protected SocketAddress localAddress0() { return null; diff --git a/transport/src/test/java/io/netty/channel/BaseChannelTest.java b/transport/src/test/java/io/netty/channel/BaseChannelTest.java index 46d37eecaa..9aa31f5c53 100644 --- a/transport/src/test/java/io/netty/channel/BaseChannelTest.java +++ b/transport/src/test/java/io/netty/channel/BaseChannelTest.java @@ -20,6 +20,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; @@ -34,7 +35,7 @@ class BaseChannelTest { } ServerBootstrap getLocalServerBootstrap() { - EventLoopGroup serverGroup = new DefaultEventLoopGroup(); + EventLoopGroup serverGroup = new LocalEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(serverGroup); sb.channel(LocalServerChannel.class); @@ -48,7 +49,7 @@ class BaseChannelTest { } Bootstrap getLocalClientBootstrap() { - EventLoopGroup clientGroup = new DefaultEventLoopGroup(); + EventLoopGroup clientGroup = new LocalEventLoopGroup(); Bootstrap cb = new Bootstrap(); cb.channel(LocalChannel.class); cb.group(clientGroup); diff --git a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java index 21ce1e2541..38d867ab19 100644 --- a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java @@ -18,9 +18,11 @@ package io.netty.channel; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; +import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import org.junit.After; @@ -54,7 +56,7 @@ public class ChannelInitializerTest { @Before public void setUp() { - group = new DefaultEventLoopGroup(1); + group = new LocalEventLoopGroup(1); server = new ServerBootstrap() .group(group) .channel(LocalServerChannel.class) @@ -264,7 +266,7 @@ public class ChannelInitializerTest { final AtomicReference errorRef = new AtomicReference(); LocalAddress addr = new LocalAddress("test"); - final EventExecutor executor = new DefaultEventLoop() { + final EventExecutor executor = new AbstractEventExecutor() { private final ScheduledExecutorService execService = Executors.newSingleThreadScheduledExecutor(); @Override diff --git a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java index 951fb38a1a..0dfe3d7e45 100644 --- a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java @@ -18,6 +18,7 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.RejectedExecutionHandlers; @@ -137,7 +138,7 @@ public class ChannelOutboundBufferTest { private final ChannelConfig config = new DefaultChannelConfig(this); TestChannel() { - super(null, new DefaultEventLoop()); + super(null, new LocalEventLoopGroup(1).next()); } @Override @@ -145,11 +146,6 @@ public class ChannelOutboundBufferTest { return new TestUnsafe(); } - @Override - protected boolean isCompatible(EventLoop loop) { - return true; - } - @Override protected SocketAddress localAddress0() { throw new UnsupportedOperationException(); diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java index 9b01d8a627..21c8015521 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java @@ -19,25 +19,23 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import io.netty.channel.local.LocalEventLoopGroup; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import io.netty.bootstrap.Bootstrap; - public class DefaultChannelPipelineTailTest { private static EventLoopGroup GROUP; @BeforeClass public static void init() { - GROUP = new DefaultEventLoopGroup(1); + GROUP = new LocalEventLoopGroup(1); } @AfterClass @@ -56,19 +54,12 @@ public class DefaultChannelPipelineTailTest { } }; - Bootstrap bootstrap = new Bootstrap() - .channelFactory(new MyChannelFactory(myChannel)) - .group(loop) - .handler(new ChannelInboundHandlerAdapter()) - .remoteAddress(new InetSocketAddress(0)); - - Channel channel = bootstrap.connect() - .sync().channel(); + myChannel.pipeline().fireChannelActive(); try { assertTrue(latch.await(1L, TimeUnit.SECONDS)); } finally { - channel.close(); + myChannel.close(); } } @@ -83,16 +74,8 @@ public class DefaultChannelPipelineTailTest { } }; - Bootstrap bootstrap = new Bootstrap() - .channelFactory(new MyChannelFactory(myChannel)) - .group(loop) - .handler(new ChannelInboundHandlerAdapter()) - .remoteAddress(new InetSocketAddress(0)); - - Channel channel = bootstrap.connect() - .sync().channel(); - - channel.close().syncUninterruptibly(); + myChannel.pipeline().fireChannelInactive(); + myChannel.close().syncUninterruptibly(); assertTrue(latch.await(1L, TimeUnit.SECONDS)); } @@ -110,22 +93,13 @@ public class DefaultChannelPipelineTailTest { } }; - Bootstrap bootstrap = new Bootstrap() - .channelFactory(new MyChannelFactory(myChannel)) - .group(loop) - .handler(new ChannelInboundHandlerAdapter()) - .remoteAddress(new InetSocketAddress(0)); - - Channel channel = bootstrap.connect() - .sync().channel(); - try { IOException ex = new IOException("testOnUnhandledInboundException"); - channel.pipeline().fireExceptionCaught(ex); + myChannel.pipeline().fireExceptionCaught(ex); assertTrue(latch.await(1L, TimeUnit.SECONDS)); assertSame(ex, causeRef.get()); } finally { - channel.close(); + myChannel.close(); } } @@ -140,20 +114,11 @@ public class DefaultChannelPipelineTailTest { } }; - Bootstrap bootstrap = new Bootstrap() - .channelFactory(new MyChannelFactory(myChannel)) - .group(loop) - .handler(new ChannelInboundHandlerAdapter()) - .remoteAddress(new InetSocketAddress(0)); - - Channel channel = bootstrap.connect() - .sync().channel(); - try { - channel.pipeline().fireChannelRead("testOnUnhandledInboundMessage"); + myChannel.pipeline().fireChannelRead("testOnUnhandledInboundMessage"); assertTrue(latch.await(1L, TimeUnit.SECONDS)); } finally { - channel.close(); + myChannel.close(); } } @@ -168,20 +133,11 @@ public class DefaultChannelPipelineTailTest { } }; - Bootstrap bootstrap = new Bootstrap() - .channelFactory(new MyChannelFactory(myChannel)) - .group(loop) - .handler(new ChannelInboundHandlerAdapter()) - .remoteAddress(new InetSocketAddress(0)); - - Channel channel = bootstrap.connect() - .sync().channel(); - try { - channel.pipeline().fireChannelReadComplete(); + myChannel.pipeline().fireChannelReadComplete(); assertTrue(latch.await(1L, TimeUnit.SECONDS)); } finally { - channel.close(); + myChannel.close(); } } @@ -196,20 +152,11 @@ public class DefaultChannelPipelineTailTest { } }; - Bootstrap bootstrap = new Bootstrap() - .channelFactory(new MyChannelFactory(myChannel)) - .group(loop) - .handler(new ChannelInboundHandlerAdapter()) - .remoteAddress(new InetSocketAddress(0)); - - Channel channel = bootstrap.connect() - .sync().channel(); - try { - channel.pipeline().fireUserEventTriggered("testOnUnhandledInboundUserEventTriggered"); + myChannel.pipeline().fireUserEventTriggered("testOnUnhandledInboundUserEventTriggered"); assertTrue(latch.await(1L, TimeUnit.SECONDS)); } finally { - channel.close(); + myChannel.close(); } } @@ -224,33 +171,11 @@ public class DefaultChannelPipelineTailTest { } }; - Bootstrap bootstrap = new Bootstrap() - .channelFactory(new MyChannelFactory(myChannel)) - .group(loop) - .handler(new ChannelInboundHandlerAdapter()) - .remoteAddress(new InetSocketAddress(0)); - - Channel channel = bootstrap.connect() - .sync().channel(); - try { - channel.pipeline().fireChannelWritabilityChanged(); + myChannel.pipeline().fireChannelWritabilityChanged(); assertTrue(latch.await(1L, TimeUnit.SECONDS)); } finally { - channel.close(); - } - } - - private static class MyChannelFactory implements ChannelFactory { - private final MyChannel channel; - - public MyChannelFactory(MyChannel channel) { - this.channel = channel; - } - - @Override - public MyChannel newChannel(EventLoop eventLoop) { - return channel; + myChannel.close(); } } @@ -296,11 +221,6 @@ public class DefaultChannelPipelineTailTest { return new MyUnsafe(); } - @Override - protected boolean isCompatible(EventLoop loop) { - return true; - } - @Override protected SocketAddress localAddress0() { return null; diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 1d0d3e0776..970f160ced 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; @@ -69,7 +70,7 @@ import static org.junit.Assert.fail; public class DefaultChannelPipelineTest { - private static final EventLoopGroup group = new DefaultEventLoopGroup(1); + private static final EventLoopGroup group = new LocalEventLoopGroup(1); private Channel self; private Channel peer; @@ -938,7 +939,7 @@ public class DefaultChannelPipelineTest { @Test(timeout = 3000) public void testAddBefore() throws Throwable { - EventLoopGroup defaultGroup = new DefaultEventLoopGroup(2); + EventLoopGroup defaultGroup = new LocalEventLoopGroup(2); try { EventLoop eventLoop1 = defaultGroup.next(); EventLoop eventLoop2 = defaultGroup.next(); @@ -1034,7 +1035,7 @@ public class DefaultChannelPipelineTest { @Test(timeout = 3000) public void testUnorderedEventExecutor() throws Throwable { EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(2); - EventLoopGroup defaultGroup = new DefaultEventLoopGroup(1); + EventLoopGroup defaultGroup = new LocalEventLoopGroup(1); try { EventLoop eventLoop1 = defaultGroup.next(); ChannelPipeline pipeline1 = new LocalChannel(eventLoop1).pipeline(); @@ -1103,7 +1104,7 @@ public class DefaultChannelPipelineTest { @Test(timeout = 3000) public void testVoidPromiseNotify() throws Throwable { - EventLoopGroup defaultGroup = new DefaultEventLoopGroup(1); + EventLoopGroup defaultGroup = new LocalEventLoopGroup(1); EventLoop eventLoop1 = defaultGroup.next(); ChannelPipeline pipeline1 = new LocalChannel(eventLoop1).pipeline(); @@ -1133,7 +1134,7 @@ public class DefaultChannelPipelineTest { // Test for https://github.com/netty/netty/issues/8676. @Test public void testHandlerRemovedOnlyCalledWhenHandlerAddedCalled() throws Exception { - EventLoopGroup group = new DefaultEventLoopGroup(1); + EventLoopGroup group = new LocalEventLoopGroup(1); try { final AtomicReference errorRef = new AtomicReference(); diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 262cd6ad94..6728084a13 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -484,6 +484,11 @@ public class SingleThreadEventLoopTest { protected void cleanup() { cleanedUp.incrementAndGet(); } + + @Override + public Unsafe unsafe() { + return null; + } } private static class SingleThreadEventLoopB extends SingleThreadEventLoop { @@ -513,5 +518,20 @@ public class SingleThreadEventLoopTest { protected void wakeup(boolean inEventLoop) { interruptThread(); } + + @Override + public Unsafe unsafe() { + return new Unsafe() { + @Override + public void register(Channel channel) { + // NOOP + } + + @Override + public void deregister(Channel channel) { + // NOOP + } + }; + } } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index c8dcd7f827..eec72f91f0 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -28,7 +28,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; @@ -71,9 +70,9 @@ public class LocalChannelTest { @BeforeClass public static void beforeClass() { - group1 = new DefaultEventLoopGroup(2); - group2 = new DefaultEventLoopGroup(2); - sharedGroup = new DefaultEventLoopGroup(1); + group1 = new LocalEventLoopGroup(2); + group2 = new LocalEventLoopGroup(2); + sharedGroup = new LocalEventLoopGroup(1); } @AfterClass @@ -228,11 +227,11 @@ public class LocalChannelTest { @Test public void localChannelRaceCondition() throws Exception { final CountDownLatch closeLatch = new CountDownLatch(1); - final EventLoopGroup clientGroup = new DefaultEventLoopGroup(1) { + final EventLoopGroup clientGroup = new LocalEventLoopGroup(1) { @Override protected EventLoop newChild(Executor threadFactory, Object... args) throws Exception { - return new SingleThreadEventLoop(this, threadFactory, true) { + return new LocalEventLoop(this, threadFactory) { @Override protected void run() { for (;;) { diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index e0bb4f4284..1feb585329 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.DefaultEventExecutorGroup; @@ -50,7 +49,7 @@ public class LocalTransportThreadModelTest { @BeforeClass public static void init() { // Configure a test server - group = new DefaultEventLoopGroup(); + group = new LocalEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(group) .channel(LocalServerChannel.class) @@ -85,7 +84,7 @@ public class LocalTransportThreadModelTest { @Test(timeout = 5000) public void testStagedExecution() throws Throwable { - EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); + EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l")); EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); ThreadNameAuditor h1 = new ThreadNameAuditor(); @@ -228,7 +227,7 @@ public class LocalTransportThreadModelTest { @Test(timeout = 30000) @Ignore public void testConcurrentMessageBufferAccess() throws Throwable { - EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); + EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l")); EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3")); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java index a57631d8d6..686aa2a535 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.DefaultEventLoopGroup; import io.netty.util.ReferenceCountUtil; import org.junit.Test; @@ -41,14 +40,14 @@ public class LocalTransportThreadModelTest2 { ServerBootstrap serverBootstrap = new ServerBootstrap(); LocalHandler serverHandler = new LocalHandler("SERVER"); serverBootstrap - .group(new DefaultEventLoopGroup(), new DefaultEventLoopGroup()) + .group(new LocalEventLoopGroup(), new LocalEventLoopGroup()) .channel(LocalServerChannel.class) .childHandler(serverHandler); Bootstrap clientBootstrap = new Bootstrap(); LocalHandler clientHandler = new LocalHandler("CLIENT"); clientBootstrap - .group(new DefaultEventLoopGroup()) + .group(new LocalEventLoopGroup()) .channel(LocalChannel.class) .remoteAddress(new LocalAddress(LOCAL_CHANNEL)).handler(clientHandler); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index 4bcca2fbf0..01c6577c5b 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.DefaultEventExecutorGroup; @@ -63,7 +62,7 @@ public class LocalTransportThreadModelTest3 { @BeforeClass public static void init() { // Configure a test server - group = new DefaultEventLoopGroup(); + group = new LocalEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(group) .channel(LocalServerChannel.class) @@ -117,7 +116,7 @@ public class LocalTransportThreadModelTest3 { } private static void testConcurrentAddRemove(boolean inbound) throws Exception { - EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l")); + EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l")); EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1")); EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2")); EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3")); diff --git a/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java b/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java index 66c4e39326..b7eff25f7c 100644 --- a/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java +++ b/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java @@ -16,15 +16,19 @@ package io.netty.channel.socket.nio; import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.AbstractNioChannel; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.AbstractEventExecutor; +import io.netty.util.concurrent.Future; import org.junit.Test; import java.io.IOException; import java.net.SocketOption; import java.net.StandardSocketOptions; import java.nio.channels.NetworkChannel; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -84,4 +88,87 @@ public abstract class AbstractNioChannelTest { eventLoopGroup.shutdownGracefully(); } } + + @Test + public void testWrapping() { + final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); + final EventLoop eventLoop = eventLoopGroup.next(); + + class WrappedEventLoop extends AbstractEventExecutor implements EventLoop { + private final EventLoop eventLoop; + + WrappedEventLoop(EventLoop eventLoop) { + super(eventLoop.parent()); + this.eventLoop = eventLoop; + } + + @Test + public EventLoopGroup parent() { + return eventLoop.parent(); + } + + @Test + public EventLoop next() { + return this; + } + + @Override + public Unsafe unsafe() { + return eventLoop.unsafe(); + } + + @Override + public void shutdown() { + eventLoop.shutdown(); + } + + @Override + public boolean inEventLoop(Thread thread) { + return eventLoop.inEventLoop(thread); + } + + @Override + public boolean isShuttingDown() { + return eventLoop.isShuttingDown(); + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + return eventLoop.shutdownGracefully(quietPeriod, timeout, unit); + } + + @Override + public Future terminationFuture() { + return eventLoop.terminationFuture(); + } + + @Override + public boolean isShutdown() { + return eventLoop.isShutdown(); + } + + @Override + public boolean isTerminated() { + return eventLoop.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return eventLoop.awaitTermination(timeout, unit); + } + + @Override + public void execute(Runnable command) { + eventLoop.execute(command); + } + } + + EventLoop wrapped = new WrappedEventLoop(eventLoop); + T channel = newNioChannel(wrapped); + channel.register().syncUninterruptibly(); + + assertSame(wrapped, channel.eventLoop()); + channel.close().syncUninterruptibly(); + eventLoopGroup.shutdownGracefully(); + } }