From 0a0da67f43354473af9861407749d02fe62e8f6c Mon Sep 17 00:00:00 2001 From: Vladimir Kostyukov Date: Thu, 28 Mar 2019 04:33:12 -0700 Subject: [PATCH] Introduce SingleThreadEventLoop.registeredChannels (#8428) Motivation: Systems depending on Netty may benefit (telemetry, alternative even loop scheduling algorithms) from knowing the number of channels assigned to each EventLoop. Modification: Expose the number of channels registered in the EventLoop via SingleThreadEventLoop.registeredChannels. Result: Fixes #8276. --- .../AbstractSingleThreadEventLoopTest.java | 53 +++++++++++++++++++ .../netty/channel/epoll/EpollEventLoop.java | 5 ++ .../channel/epoll/EpollEventLoopTest.java | 14 ++++- .../netty/channel/kqueue/KQueueEventLoop.java | 5 ++ .../netty/channel/kqueue/NativeLongArray.java | 4 ++ .../channel/kqueue/KQueueEventLoopTest.java | 14 ++++- .../netty/channel/SingleThreadEventLoop.java | 10 ++++ .../channel/ThreadPerChannelEventLoop.java | 5 ++ .../io/netty/channel/nio/NioEventLoop.java | 5 ++ .../netty/channel/nio/NioEventLoopTest.java | 21 ++++++++ 10 files changed, 134 insertions(+), 2 deletions(-) create mode 100644 testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java new file mode 100644 index 0000000000..e4306b3dc8 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport; + +import org.junit.Test; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.socket.ServerSocketChannel; + +import static org.junit.Assert.*; + +public abstract class AbstractSingleThreadEventLoopTest { + + @Test + public void testChannelsRegistered() { + EventLoopGroup group = newEventLoopGroup(); + final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next(); + + try { + final Channel ch1 = newChannel(); + final Channel ch2 = newChannel(); + + assertEquals(0, loop.registeredChannels()); + + assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess()); + assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess()); + assertEquals(2, loop.registeredChannels()); + + assertTrue(ch1.deregister().syncUninterruptibly().isSuccess()); + assertEquals(1, loop.registeredChannels()); + } finally { + group.shutdownGracefully(); + } + } + + protected abstract EventLoopGroup newEventLoopGroup(); + protected abstract ServerSocketChannel newChannel(); +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 3420d67bc8..e7bfad61af 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -227,6 +227,11 @@ class EpollEventLoop extends SingleThreadEventLoop { this.ioRatio = ioRatio; } + @Override + public int registeredChannels() { + return channels.size(); + } + private int epollWait(boolean oldWakeup) throws IOException { // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event. // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java index 4e51114422..f250c272e6 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java @@ -18,6 +18,8 @@ package io.netty.channel.epoll; import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.RejectedExecutionHandlers; @@ -31,7 +33,17 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class EpollEventLoopTest { +public class EpollEventLoopTest extends AbstractSingleThreadEventLoopTest { + + @Override + protected EventLoopGroup newEventLoopGroup() { + return new EpollEventLoopGroup(); + } + + @Override + protected ServerSocketChannel newChannel() { + return new EpollServerSocketChannel(); + } @Test public void testScheduleBigDelayNotOverflow() { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index 25c50a4f05..eabee894ee 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -309,6 +309,11 @@ final class KQueueEventLoop extends SingleThreadEventLoop { this.ioRatio = ioRatio; } + @Override + public int registeredChannels() { + return channels.size(); + } + @Override protected void cleanup() { try { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/NativeLongArray.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/NativeLongArray.java index 7f83738e70..edcadf5519 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/NativeLongArray.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/NativeLongArray.java @@ -63,6 +63,10 @@ final class NativeLongArray { return size == 0; } + int size() { + return size; + } + void free() { Buffer.free(memory); memoryAddress = 0; diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java index 0d44155999..ceda867dea 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java @@ -17,6 +17,8 @@ package io.netty.channel.kqueue; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest; import io.netty.util.concurrent.Future; import org.junit.Test; @@ -25,7 +27,17 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class KQueueEventLoopTest { +public class KQueueEventLoopTest extends AbstractSingleThreadEventLoopTest { + + @Override + protected EventLoopGroup newEventLoopGroup() { + return new KQueueEventLoopGroup(); + } + + @Override + protected ServerSocketChannel newChannel() { + return new KQueueServerSocketChannel(); + } @Test public void testScheduleBigDelayNotOverflow() { diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index c547b341f6..1fe2d3fe2b 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -148,6 +148,16 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im return super.pendingTasks() + tailTasks.size(); } + /** + * Returns the number of {@link Channel}s registered with this {@link EventLoop} or {@code -1} + * if operation is not supported. The returned value is not guaranteed to be exact accurate and + * should be viewed as a best effort. + */ + @UnstableApi + public int registeredChannels() { + return -1; + } + /** * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases. */ diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java index 1d4b95817c..497e796129 100644 --- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java +++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java @@ -95,4 +95,9 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop { parent.activeChildren.remove(this); parent.idleChildren.add(this); } + + @Override + public int registeredChannels() { + return 1; + } } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index 100065f350..bb9a1e25cf 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -356,6 +356,11 @@ public final class NioEventLoop extends SingleThreadEventLoop { rebuildSelector0(); } + @Override + public int registeredChannels() { + return selector.keys().size() - cancelledKeys; + } + private void rebuildSelector0() { final Selector oldSelector = selector; final SelectorTuple newSelectorTuple; diff --git a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java index 8b176bc71c..7e545557cf 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java @@ -258,4 +258,25 @@ public class NioEventLoopTest extends AbstractEventLoopTest { } } + @Test + public void testChannelsRegistered() { + NioEventLoopGroup group = new NioEventLoopGroup(1); + final NioEventLoop loop = (NioEventLoop) group.next(); + + try { + final Channel ch1 = new NioServerSocketChannel(); + final Channel ch2 = new NioServerSocketChannel(); + + assertEquals(0, loop.registeredChannels()); + + assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess()); + assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess()); + assertEquals(2, loop.registeredChannels()); + + assertTrue(ch1.deregister().syncUninterruptibly().isSuccess()); + assertEquals(1, loop.registeredChannels()); + } finally { + group.shutdownGracefully(); + } + } }