diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index 19a8372419..96e55e1ecc 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -213,6 +213,33 @@ static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, j return -err; } +static inline void cpu_relax() { +#if defined(__x86_64__) + asm volatile("pause\n": : :"memory"); +#endif +} + +static jint netty_epoll_native_epollBusyWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len) { + struct epoll_event *ev = (struct epoll_event*) (intptr_t) address; + int result, err; + + // Zeros = poll (aka return immediately). + do { + result = epoll_wait(efd, ev, len, 0); + if (result == 0) { + // Since we're always polling epoll_wait with no timeout, + // signal CPU that we're in a busy loop + cpu_relax(); + } + + if (result >= 0) { + return result; + } + } while((err = errno) == EINTR); + + return -err; +} + static jint netty_epoll_native_epollCtlAdd0(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags) { int res = epollCtl(env, efd, EPOLL_CTL_ADD, fd, flags); if (res < 0) { @@ -387,6 +414,7 @@ static const JNINativeMethod fixed_method_table[] = { { "timerFdRead", "(I)V", (void *) netty_epoll_native_timerFdRead }, { "epollCreate", "()I", (void *) netty_epoll_native_epollCreate }, { "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, + { "epollBusyWait0", "(IJI)I", (void *) netty_epoll_native_epollBusyWait0 }, { "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 }, { "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 }, { "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 }, diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 462b0b0a33..a2707d96a9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -257,6 +257,10 @@ final class EpollEventLoop extends SingleThreadEventLoop { return Native.epollWait(epollFd, events, timerFd, 0, 0); } + private int epollBusyWait() throws IOException { + return Native.epollBusyWait(epollFd, events); + } + @Override protected void run() { for (;;) { @@ -265,6 +269,11 @@ final class EpollEventLoop extends SingleThreadEventLoop { switch (strategy) { case SelectStrategy.CONTINUE: continue; + + case SelectStrategy.BUSY_WAIT: + strategy = epollBusyWait(); + break; + case SelectStrategy.SELECT: strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 7c501da80b..1eb2e5d3ff 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -120,6 +120,21 @@ public final class Native { } private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs); + /** + * Non-blocking variant of + * {@link #epollWait(FileDescriptor, EpollEventArray, FileDescriptor, int, int)} + * that will also hint to processor we are in a busy-wait loop. + */ + public static int epollBusyWait(FileDescriptor epollFd, EpollEventArray events) throws IOException { + int ready = epollBusyWait0(epollFd.intValue(), events.memoryAddress(), events.length()); + if (ready < 0) { + throw newIOException("epoll_wait", ready); + } + return ready; + } + + private static native int epollBusyWait0(int efd, long address, int len); + public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException { int res = epollCtlAdd0(efd, fd, flags); if (res < 0) { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketStringEchoBusyWaitTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketStringEchoBusyWaitTest.java new file mode 100644 index 0000000000..e599e3bef8 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketStringEchoBusyWaitTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; + +public class EpollETSocketStringEchoBusyWaitTest extends EpollSocketStringEchoBusyWaitTest { + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketStringEchoBusyWaitTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketStringEchoBusyWaitTest.java new file mode 100644 index 0000000000..d6098774c0 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketStringEchoBusyWaitTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; + +public class EpollLTSocketStringEchoBusyWaitTest extends EpollSocketStringEchoBusyWaitTest { + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoBusyWaitTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoBusyWaitTest.java new file mode 100644 index 0000000000..f67b6da94f --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoBusyWaitTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SelectStrategy; +import io.netty.channel.SelectStrategyFactory; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory; +import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; +import io.netty.testsuite.transport.socket.SocketStringEchoTest; +import io.netty.util.IntSupplier; +import io.netty.util.concurrent.DefaultThreadFactory; + +public class EpollSocketStringEchoBusyWaitTest extends SocketStringEchoTest { + + private static EventLoopGroup EPOLL_LOOP; + + @BeforeClass + public static void setup() throws Exception { + EPOLL_LOOP = new EpollEventLoopGroup(2, new DefaultThreadFactory("testsuite-epoll-busy-wait", true), + new SelectStrategyFactory() { + @Override + public SelectStrategy newSelectStrategy() { + return new SelectStrategy() { + @Override + public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) { + return SelectStrategy.BUSY_WAIT; + } + }; + } + }); + } + + @AfterClass + public static void teardown() throws Exception { + if (EPOLL_LOOP != null) { + EPOLL_LOOP.shutdownGracefully(); + } + } + + @Override + protected List> newFactories() { + List> list = + new ArrayList>(); + final BootstrapFactory sbf = serverSocket(); + final BootstrapFactory cbf = clientSocket(); + list.add(new BootstrapComboFactory() { + @Override + public ServerBootstrap newServerInstance() { + return sbf.newInstance(); + } + + @Override + public Bootstrap newClientInstance() { + return cbf.newInstance(); + } + }); + + return list; + } + + private static BootstrapFactory serverSocket() { + return new BootstrapFactory() { + @Override + public ServerBootstrap newInstance() { + return new ServerBootstrap().group(EPOLL_LOOP, EPOLL_LOOP).channel(EpollServerSocketChannel.class); + } + }; + } + + private static BootstrapFactory clientSocket() { + return new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(EPOLL_LOOP).channel(EpollSocketChannel.class); + } + }; + } +} diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index 64a09c97d1..8f2a4ca45d 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -208,6 +208,10 @@ final class KQueueEventLoop extends SingleThreadEventLoop { switch (strategy) { case SelectStrategy.CONTINUE: continue; + + case SelectStrategy.BUSY_WAIT: + // fall-through to SELECT since the busy-wait is not supported with kqueue + case SelectStrategy.SELECT: strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1); diff --git a/transport/src/main/java/io/netty/channel/SelectStrategy.java b/transport/src/main/java/io/netty/channel/SelectStrategy.java index 0c8aca5789..447fb7f3f1 100644 --- a/transport/src/main/java/io/netty/channel/SelectStrategy.java +++ b/transport/src/main/java/io/netty/channel/SelectStrategy.java @@ -33,6 +33,10 @@ public interface SelectStrategy { * Indicates the IO loop should be retried, no blocking select to follow directly. */ int CONTINUE = -2; + /** + * Indicates the IO loop to poll for new events without blocking. + */ + int BUSY_WAIT = -3; /** * The {@link SelectStrategy} can be used to steer the outcome of a potential select diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index a95d514c19..c574dded63 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -404,6 +404,10 @@ public final class NioEventLoop extends SingleThreadEventLoop { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; + + case SelectStrategy.BUSY_WAIT: + // fall-through to SELECT since the busy-wait is not supported with NIO + case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));