Introduce SingleThreadEventLoop.registeredChannels (#8428)

Motivation:

Systems depending on Netty may benefit (telemetry, alternative even loop scheduling algorithms) from knowing the number of channels assigned to each EventLoop.

Modification:

Expose the number of channels registered in the EventLoop via SingleThreadEventLoop.registeredChannels.

Result:

Fixes #8276.
This commit is contained in:
Vladimir Kostyukov 2019-03-28 04:33:12 -07:00 committed by Norman Maurer
parent 8206604003
commit 0a0da67f43
10 changed files with 134 additions and 2 deletions

View File

@ -0,0 +1,53 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport;
import org.junit.Test;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.socket.ServerSocketChannel;
import static org.junit.Assert.*;
public abstract class AbstractSingleThreadEventLoopTest {
@Test
public void testChannelsRegistered() {
EventLoopGroup group = newEventLoopGroup();
final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
try {
final Channel ch1 = newChannel();
final Channel ch2 = newChannel();
assertEquals(0, loop.registeredChannels());
assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess());
assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess());
assertEquals(2, loop.registeredChannels());
assertTrue(ch1.deregister().syncUninterruptibly().isSuccess());
assertEquals(1, loop.registeredChannels());
} finally {
group.shutdownGracefully();
}
}
protected abstract EventLoopGroup newEventLoopGroup();
protected abstract ServerSocketChannel newChannel();
}

View File

@ -227,6 +227,11 @@ class EpollEventLoop extends SingleThreadEventLoop {
this.ioRatio = ioRatio;
}
@Override
public int registeredChannels() {
return channels.size();
}
private int epollWait(boolean oldWakeup) throws IOException {
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended

View File

@ -18,6 +18,8 @@ package io.netty.channel.epoll;
import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.RejectedExecutionHandlers;
@ -31,7 +33,17 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class EpollEventLoopTest {
public class EpollEventLoopTest extends AbstractSingleThreadEventLoopTest {
@Override
protected EventLoopGroup newEventLoopGroup() {
return new EpollEventLoopGroup();
}
@Override
protected ServerSocketChannel newChannel() {
return new EpollServerSocketChannel();
}
@Test
public void testScheduleBigDelayNotOverflow() {

View File

@ -309,6 +309,11 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
this.ioRatio = ioRatio;
}
@Override
public int registeredChannels() {
return channels.size();
}
@Override
protected void cleanup() {
try {

View File

@ -63,6 +63,10 @@ final class NativeLongArray {
return size == 0;
}
int size() {
return size;
}
void free() {
Buffer.free(memory);
memoryAddress = 0;

View File

@ -17,6 +17,8 @@ package io.netty.channel.kqueue;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest;
import io.netty.util.concurrent.Future;
import org.junit.Test;
@ -25,7 +27,17 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class KQueueEventLoopTest {
public class KQueueEventLoopTest extends AbstractSingleThreadEventLoopTest {
@Override
protected EventLoopGroup newEventLoopGroup() {
return new KQueueEventLoopGroup();
}
@Override
protected ServerSocketChannel newChannel() {
return new KQueueServerSocketChannel();
}
@Test
public void testScheduleBigDelayNotOverflow() {

View File

@ -148,6 +148,16 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
return super.pendingTasks() + tailTasks.size();
}
/**
* Returns the number of {@link Channel}s registered with this {@link EventLoop} or {@code -1}
* if operation is not supported. The returned value is not guaranteed to be exact accurate and
* should be viewed as a best effort.
*/
@UnstableApi
public int registeredChannels() {
return -1;
}
/**
* Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases.
*/

View File

@ -95,4 +95,9 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
parent.activeChildren.remove(this);
parent.idleChildren.add(this);
}
@Override
public int registeredChannels() {
return 1;
}
}

View File

@ -356,6 +356,11 @@ public final class NioEventLoop extends SingleThreadEventLoop {
rebuildSelector0();
}
@Override
public int registeredChannels() {
return selector.keys().size() - cancelledKeys;
}
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;

View File

@ -258,4 +258,25 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
}
}
@Test
public void testChannelsRegistered() {
NioEventLoopGroup group = new NioEventLoopGroup(1);
final NioEventLoop loop = (NioEventLoop) group.next();
try {
final Channel ch1 = new NioServerSocketChannel();
final Channel ch2 = new NioServerSocketChannel();
assertEquals(0, loop.registeredChannels());
assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess());
assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess());
assertEquals(2, loop.registeredChannels());
assertTrue(ch1.deregister().syncUninterruptibly().isSuccess());
assertEquals(1, loop.registeredChannels());
} finally {
group.shutdownGracefully();
}
}
}