From b984ca7979670a86bb4acc3b83eed0b1125824a6 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 29 Jan 2015 06:49:14 +0100 Subject: [PATCH] Allow to change epoll mode Motivation: Netty uses edge-triggered epoll by default for performance reasons. The downside here is that a messagesPerRead limit can not be enforced correctly, as we need to consume everything from the channel when notified. Modification: - Allow to switch epoll modes before channel is registered - Some refactoring to share more code Result: It's now possible to switch epoll mode. --- .../main/c/io_netty_channel_epoll_Native.c | 5 +- .../main/c/io_netty_channel_epoll_Native.h | 5 +- .../channel/epoll/AbstractEpollChannel.java | 60 ++++--- .../epoll/AbstractEpollServerChannel.java | 18 +- .../epoll/AbstractEpollStreamChannel.java | 30 +++- .../channel/epoll/EpollChannelConfig.java | 161 ++++++++++++++++++ .../channel/epoll/EpollChannelOption.java | 4 +- .../channel/epoll/EpollDatagramChannel.java | 24 ++- .../epoll/EpollDatagramChannelConfig.java | 14 +- .../epoll/EpollDomainSocketChannel.java | 26 ++- .../epoll/EpollDomainSocketChannelConfig.java | 11 +- .../netty/channel/epoll/EpollEventLoop.java | 10 +- .../io/netty/channel/epoll/EpollMode.java | 36 ++++ .../epoll/EpollServerChannelConfig.java | 8 +- .../epoll/EpollSocketChannelConfig.java | 8 +- .../java/io/netty/channel/epoll/Native.java | 5 +- 16 files changed, 338 insertions(+), 87 deletions(-) create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollMode.java diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index 12d92e1072..98f585aec5 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -105,11 +105,8 @@ char* exceptionMessage(char* msg, int error) { } jint epollCtl(JNIEnv* env, jint efd, int op, jint fd, jint flags, jint id) { - uint32_t events = EPOLLET; + uint32_t events = (flags & EPOLL_EDGE) ? EPOLLET : 0; - if (flags & EPOLL_ACCEPT) { - events |= EPOLLIN; - } if (flags & EPOLL_READ) { events |= EPOLLIN | EPOLLRDHUP; } diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index 7c0eaafdca..e620e5026b 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -18,8 +18,9 @@ #define EPOLL_READ 0x01 #define EPOLL_WRITE 0x02 -#define EPOLL_ACCEPT 0x04 -#define EPOLL_RDHUP 0x08 +#define EPOLL_RDHUP 0x04 +#define EPOLL_EDGE 0x08 + // Define SO_REUSEPORT if not found to fix build issues. // See https://github.com/netty/netty/issues/2558 diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 61a0516fe4..b009c8be70 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -35,7 +35,8 @@ abstract class AbstractEpollChannel extends AbstractChannel { private static final ChannelMetadata DATA = new ChannelMetadata(false); private final int readFlag; private volatile FileDescriptor fileDescriptor; - protected int flags; + protected int flags = Native.EPOLLET; + protected volatile boolean active; int id; @@ -51,6 +52,24 @@ abstract class AbstractEpollChannel extends AbstractChannel { fileDescriptor = new EpollFileDescriptor(fd); } + void setFlag(int flag) { + if (!isFlagSet(flag)) { + flags |= flag; + modifyEvents(); + } + } + + void clearFlag(int flag) { + if (isFlagSet(flag)) { + flags &= ~flag; + modifyEvents(); + } + } + + boolean isFlagSet(int flag) { + return (flags & flag) != 0; + } + /** * Returns the {@link FileDescriptor} that is used by this {@link Channel}. */ @@ -58,6 +77,9 @@ abstract class AbstractEpollChannel extends AbstractChannel { return fileDescriptor; } + @Override + public abstract EpollChannelConfig config(); + @Override public boolean isActive() { return active; @@ -105,10 +127,7 @@ abstract class AbstractEpollChannel extends AbstractChannel { // Channel.read() or ChannelHandlerContext.read() was called ((AbstractEpollUnsafe) unsafe()).readPending = true; - if ((flags & readFlag) == 0) { - flags |= readFlag; - modifyEvents(); - } + setFlag(readFlag); } final void clearEpollIn() { @@ -137,22 +156,8 @@ abstract class AbstractEpollChannel extends AbstractChannel { } } - protected final void setEpollOut() { - if ((flags & Native.EPOLLOUT) == 0) { - flags |= Native.EPOLLOUT; - modifyEvents(); - } - } - - protected final void clearEpollOut() { - if ((flags & Native.EPOLLOUT) != 0) { - flags &= ~Native.EPOLLOUT; - modifyEvents(); - } - } - private void modifyEvents() { - if (isOpen()) { + if (isOpen() && isRegistered()) { ((EpollEventLoop) eventLoop()).modify(this); } } @@ -250,7 +255,7 @@ abstract class AbstractEpollChannel extends AbstractChannel { readerIndex += localFlushedAmount; } else { // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); + setFlag(Native.EPOLLOUT); return writtenBytes; } } @@ -273,7 +278,7 @@ abstract class AbstractEpollChannel extends AbstractChannel { } } else { // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); + setFlag(Native.EPOLLOUT); break; } } @@ -301,7 +306,7 @@ abstract class AbstractEpollChannel extends AbstractChannel { // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. - if (isFlushPending()) { + if (isFlagSet(Native.EPOLLOUT)) { return; } super.flush0(); @@ -315,15 +320,8 @@ abstract class AbstractEpollChannel extends AbstractChannel { super.flush0(); } - private boolean isFlushPending() { - return (flags & Native.EPOLLOUT) != 0; - } - protected final void clearEpollIn0() { - if ((flags & readFlag) != 0) { - flags &= ~readFlag; - modifyEvents(); - } + clearFlag(readFlag); } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index 05423eac16..adcb963470 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -29,7 +29,7 @@ import java.net.SocketAddress; public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel { protected AbstractEpollServerChannel(int fd) { - super(fd, Native.EPOLLACCEPT); + super(fd, Native.EPOLLIN); } @Override @@ -74,7 +74,12 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im Throwable exception = null; try { try { - for (;;) { + boolean edgeTriggered = isFlagSet(Native.EPOLLET); + // if edgeTriggered is used we need to read all messages as we are not notified again otherwise. + final int maxMessagesPerRead = edgeTriggered + ? Integer.MAX_VALUE : config().getMaxMessagesPerRead(); + int messages = 0; + do { int socketFd = Native.accept(fd().intValue()); if (socketFd == -1) { // this means everything was handled for now @@ -88,8 +93,15 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im // keep on reading as we use epoll ET and need to consume everything from the socket pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(t); + } finally { + if (!edgeTriggered && !config().isAutoRead()) { + // This is not using EPOLLET so we can stop reading + // ASAP as we will get notified again later with + // pending data + break; + } } - } + } while (++ messages < maxMessagesPerRead); } catch (Throwable t) { exception = t; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 8041f53edb..15a9cfb38f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -100,7 +100,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt); if (localWrittenBytes == 0) { // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); + setFlag(Native.EPOLLOUT); break; } expectedWrittenBytes -= localWrittenBytes; @@ -142,7 +142,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt); if (localWrittenBytes == 0) { // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); + setFlag(Native.EPOLLOUT); break; } expectedWrittenBytes -= localWrittenBytes; @@ -195,7 +195,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset); if (localFlushedAmount == 0) { // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); + setFlag(Native.EPOLLOUT); break; } @@ -223,7 +223,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { if (msgCount == 0) { // Wrote all messages. - clearEpollOut(); + clearFlag(Native.EPOLLOUT); break; } @@ -375,7 +375,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { try { boolean connected = Native.connect(fd().intValue(), remoteAddress); if (!connected) { - setEpollOut(); + setFlag(Native.EPOLLOUT); } success = true; return connected; @@ -557,10 +557,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { */ private boolean doFinishConnect() throws Exception { if (Native.finishConnect(fd().intValue())) { - clearEpollOut(); + clearFlag(Native.EPOLLOUT); return true; } else { - setEpollOut(); + setFlag(Native.EPOLLOUT); return false; } } @@ -587,8 +587,13 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { ByteBuf byteBuf = null; boolean close = false; try { + boolean edgeTriggered = isFlagSet(Native.EPOLLET); + // if edgeTriggered is used we need to read all messages as we are not notified again otherwise. + final int maxMessagesPerRead = edgeTriggered + ? Integer.MAX_VALUE : config().getMaxMessagesPerRead(); + int messages = 0; int totalReadAmount = 0; - for (;;) { + do { // we use a direct buffer here as the native implementations only be able // to handle direct buffers. byteBuf = allocHandle.allocate(allocator); @@ -618,7 +623,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { // which might mean we drained the recv buffer completely. break; } - } + if (!edgeTriggered && !config().isAutoRead()) { + // This is not using EPOLLET so we can stop reading + // ASAP as we will get notified again later with + // pending data + break; + } + } while (++ messages < maxMessagesPerRead); + pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java new file mode 100644 index 0000000000..107274c773 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java @@ -0,0 +1,161 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; + +import java.util.Map; + +public class EpollChannelConfig extends DefaultChannelConfig { + final AbstractEpollChannel channel; + + EpollChannelConfig(AbstractEpollChannel channel) { + super(channel); + this.channel = channel; + } + + @Override + public Map, Object> getOptions() { + return getOptions(super.getOptions(), EpollChannelOption.EPOLL_MODE); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == EpollChannelOption.EPOLL_MODE) { + return (T) getEpollMode(); + } + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + if (option == EpollChannelOption.EPOLL_MODE) { + setEpollMode((EpollMode) value); + } else { + return super.setOption(option, value); + } + return true; + } + + @Override + public EpollChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public EpollChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public EpollChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public EpollChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + public EpollChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + public EpollChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } + + /** + * Return the {@link EpollMode} used. Default is + * {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or + * {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use + * {@link EpollMode#LEVEL_TRIGGERED}. + */ + public EpollMode getEpollMode() { + return channel.isFlagSet(Native.EPOLLET) + ? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED; + } + + /** + * Set the {@link EpollMode} used. Default is + * {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or + * {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use + * {@link EpollMode#LEVEL_TRIGGERED}. + * + * Be aware this config setting can only be adjusted before the channel was registered. + */ + public EpollChannelConfig setEpollMode(EpollMode mode) { + if (mode == null) { + throw new NullPointerException("mode"); + } + switch (mode) { + case EDGE_TRIGGERED: + checkChannelNotRegistered(); + channel.setFlag(Native.EPOLLET); + break; + case LEVEL_TRIGGERED: + checkChannelNotRegistered(); + channel.clearFlag(Native.EPOLLET); + break; + default: + throw new Error(); + } + return this; + } + + private void checkChannelNotRegistered() { + if (channel.isRegistered()) { + throw new IllegalStateException("EpollMode can only be changed before channel is registered"); + } + } + + @Override + protected final void autoReadCleared() { + channel.clearEpollIn(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java index 1963d57fc7..094ad7a03b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java @@ -27,7 +27,7 @@ public final class EpollChannelOption { public static final ChannelOption TCP_KEEPCNT = ChannelOption.valueOf(T, "TCP_KEEPCNT"); public static final ChannelOption DOMAIN_SOCKET_READ_MODE = ChannelOption.valueOf(T, "DOMAIN_SOCKET_READ_MODE"); - + public static final ChannelOption EPOLL_MODE = + ChannelOption.valueOf(T, "EPOLL_MODE"); private EpollChannelOption() { } - } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index cc2ff40e23..5dd920cbee 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultAddressedEnvelope; +import io.netty.channel.FileDescriptor; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelConfig; @@ -84,7 +85,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override @SuppressWarnings("deprecation") public boolean isActive() { - return fd() != EpollFileDescriptor.INVALID && + return fd() != FileDescriptor.INVALID && (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered() || active); } @@ -274,7 +275,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements Object msg = in.current(); if (msg == null) { // Wrote all messages. - clearEpollOut(); + clearFlag(Native.EPOLLOUT); break; } @@ -293,7 +294,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt); if (send == 0) { // Did not write all messages. - setEpollOut(); + setFlag(Native.EPOLLOUT); return; } for (int i = 0; i < send; i++) { @@ -317,7 +318,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements in.remove(); } else { // Did not write all messages. - setEpollOut(); + setFlag(Native.EPOLLOUT); break; } } catch (IOException e) { @@ -501,7 +502,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements final ChannelPipeline pipeline = pipeline(); Throwable exception = null; try { - for (;;) { + boolean edgeTriggered = isFlagSet(Native.EPOLLET); + // if edgeTriggered is used we need to read all messages as we are not notified again otherwise. + final int maxMessagesPerRead = edgeTriggered + ? Integer.MAX_VALUE : config().getMaxMessagesPerRead(); + int messages = 0; + do { ByteBuf data = null; try { data = allocHandle.allocate(config.getAllocator()); @@ -536,8 +542,14 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (data != null) { data.release(); } + if (!edgeTriggered && !config().isAutoRead()) { + // This is not using EPOLLET so we can stop reading + // ASAP as we will get notified again later with + // pending data + break; + } } - } + } while (++ messages < maxMessagesPerRead); int size = readBuf.size(); for (int i = 0; i < size; i ++) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java index 19fe9f2d88..e2f405492f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -17,7 +17,6 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelOption; -import io.netty.channel.DefaultChannelConfig; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; @@ -27,7 +26,7 @@ import java.net.InetAddress; import java.net.NetworkInterface; import java.util.Map; -public final class EpollDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig { +public final class EpollDatagramChannelConfig extends EpollChannelConfig implements DatagramChannelConfig { private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048); private final EpollDatagramChannel datagramChannel; private boolean activeOnOpen; @@ -285,6 +284,12 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple throw new UnsupportedOperationException("Multicast not supported"); } + @Override + public EpollDatagramChannelConfig setEpollMode(EpollMode mode) { + super.setEpollMode(mode); + return this; + } + /** * Returns {@code true} if the SO_REUSEPORT option is set. */ @@ -303,9 +308,4 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple Native.setReusePort(datagramChannel.fd().intValue(), reusePort ? 1 : 0); return this; } - - @Override - protected void autoReadCleared() { - datagramChannel.clearEpollIn(); - } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index 0423ec1216..e0fba2ffa0 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -126,7 +126,12 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel { final ChannelPipeline pipeline = pipeline(); try { - for (;;) { + boolean edgeTriggered = isFlagSet(Native.EPOLLET); + // if edgeTriggered is used we need to read all messages as we are not notified again otherwise. + final int maxMessagesPerRead = edgeTriggered + ? Integer.MAX_VALUE : config().getMaxMessagesPerRead(); + int messages = 0; + do { int socketFd = Native.recvFd(fd().intValue()); if (socketFd == 0) { break; @@ -136,8 +141,23 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel { return; } readPending = false; - pipeline.fireChannelRead(new EpollFileDescriptor(socketFd)); - } + + try { + pipeline.fireChannelRead(new EpollFileDescriptor(socketFd)); + } catch (Throwable t) { + // keep on reading as we use epoll ET and need to consume everything from the socket + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(t); + } finally { + if (!edgeTriggered && !config().isAutoRead()) { + // This is not using EPOLLET so we can stop reading + // ASAP as we will get notified again later with + // pending data + break; + } + } + } while (++ messages < maxMessagesPerRead); + pipeline.fireChannelReadComplete(); } catch (Throwable t) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java index 6dc9cc72e7..a63aa050f7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java @@ -18,18 +18,17 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; -import io.netty.channel.DefaultChannelConfig; import io.netty.channel.ChannelPipeline; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import java.util.Map; -public final class EpollDomainSocketChannelConfig extends DefaultChannelConfig { +public final class EpollDomainSocketChannelConfig extends EpollChannelConfig { private volatile DomainSocketReadMode mode = DomainSocketReadMode.BYTES; - EpollDomainSocketChannelConfig(Channel channel) { + EpollDomainSocketChannelConfig(AbstractEpollChannel channel) { super(channel); } @@ -119,6 +118,12 @@ public final class EpollDomainSocketChannelConfig extends DefaultChannelConfig { return this; } + @Override + public EpollDomainSocketChannelConfig setEpollMode(EpollMode mode) { + super.setEpollMode(mode); + return this; + } + /** * Change the {@link DomainSocketReadMode} for the channel. The default is * {@link DomainSocketReadMode#BYTES} which means bytes will be read from the diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 7e552f1ef6..e8aff4244b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -321,22 +321,18 @@ final class EpollEventLoop extends SingleThreadEventLoop { // consume wakeup event Native.eventFdRead(eventFd); } else { - boolean read = (ev & Native.EPOLLIN) != 0; - boolean write = (ev & Native.EPOLLOUT) != 0; - boolean close = (ev & Native.EPOLLRDHUP) != 0; - AbstractEpollChannel ch = ids.get(id); if (ch != null) { AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe(); - if (write && ch.isOpen()) { + if ((ev & Native.EPOLLOUT) != 0 && ch.isOpen()) { // force flush of data as the epoll is writable again unsafe.epollOutReady(); } - if (read && ch.isOpen()) { + if ((ev & Native.EPOLLIN) != 0 && ch.isOpen()) { // Something is ready to read, so consume it now unsafe.epollInReady(); } - if (close && ch.isOpen()) { + if ((ev & Native.EPOLLRDHUP) != 0 && ch.isOpen()) { unsafe.epollRdHupReady(); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollMode.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollMode.java new file mode 100644 index 0000000000..ec73fb70aa --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollMode.java @@ -0,0 +1,36 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +/** + * The epoll mode to use. + */ +public enum EpollMode { + + /** + * Use {@code EPOLLET} (edge-triggered). + * + * @see man 7 epoll. + */ + EDGE_TRIGGERED, + + /** + * Do not use {@code EPOLLET} (level-triggered). + * + * @see man 7 epoll. + */ + LEVEL_TRIGGERED +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java index 1e6d39f52d..b24e2632f6 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java @@ -17,7 +17,6 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelOption; -import io.netty.channel.DefaultChannelConfig; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import io.netty.util.NetUtil; @@ -28,7 +27,7 @@ import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_RCVBUF; import static io.netty.channel.ChannelOption.SO_REUSEADDR; -public class EpollServerChannelConfig extends DefaultChannelConfig { +public class EpollServerChannelConfig extends EpollChannelConfig { protected final AbstractEpollChannel channel; private volatile int backlog = NetUtil.SOMAXCONN; @@ -159,7 +158,8 @@ public class EpollServerChannelConfig extends DefaultChannelConfig { } @Override - protected final void autoReadCleared() { - channel.clearEpollIn(); + public EpollServerChannelConfig setEpollMode(EpollMode mode) { + super.setEpollMode(mode); + return this; } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java index 0e347079ea..c3d651e7eb 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java @@ -17,7 +17,6 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelOption; -import io.netty.channel.DefaultChannelConfig; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.SocketChannelConfig; @@ -27,7 +26,7 @@ import java.util.Map; import static io.netty.channel.ChannelOption.*; -public final class EpollSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { +public final class EpollSocketChannelConfig extends EpollChannelConfig implements SocketChannelConfig { private final EpollSocketChannel channel; private volatile boolean allowHalfClosure; @@ -345,7 +344,8 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig impleme } @Override - protected void autoReadCleared() { - channel.clearEpollIn(); + public EpollSocketChannelConfig setEpollMode(EpollMode mode) { + super.setEpollMode(mode); + return this; } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 8a25fb45e2..2caebc6336 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -51,8 +51,9 @@ final class Native { // EventLoop operations and constants public static final int EPOLLIN = 0x01; public static final int EPOLLOUT = 0x02; - public static final int EPOLLACCEPT = 0x04; - public static final int EPOLLRDHUP = 0x08; + public static final int EPOLLRDHUP = 0x04; + public static final int EPOLLET = 0x08; + public static final int IOV_MAX = iovMax(); public static final int UIO_MAX_IOV = uioMaxIov(); public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();