Allow to change epoll mode

Motivation:
Netty uses edge-triggered epoll by default for performance reasons. The downside here is that a messagesPerRead limit can not be enforced correctly, as we need to consume everything from the channel when notified.

Modification:
- Allow to switch epoll modes before channel is registered
- Some refactoring to share more code

Result:
It's now possible to switch epoll mode.
This commit is contained in:
Norman Maurer 2015-01-29 06:49:14 +01:00
parent 495fa6be3c
commit dd32313fad
16 changed files with 339 additions and 85 deletions

View File

@ -105,11 +105,8 @@ char* exceptionMessage(char* msg, int error) {
} }
jint epollCtl(JNIEnv* env, jint efd, int op, jint fd, jint flags, jint id) { jint epollCtl(JNIEnv* env, jint efd, int op, jint fd, jint flags, jint id) {
uint32_t events = EPOLLET; uint32_t events = (flags & EPOLL_EDGE) ? EPOLLET : 0;
if (flags & EPOLL_ACCEPT) {
events |= EPOLLIN;
}
if (flags & EPOLL_READ) { if (flags & EPOLL_READ) {
events |= EPOLLIN | EPOLLRDHUP; events |= EPOLLIN | EPOLLRDHUP;
} }

View File

@ -18,8 +18,9 @@
#define EPOLL_READ 0x01 #define EPOLL_READ 0x01
#define EPOLL_WRITE 0x02 #define EPOLL_WRITE 0x02
#define EPOLL_ACCEPT 0x04 #define EPOLL_RDHUP 0x04
#define EPOLL_RDHUP 0x08 #define EPOLL_EDGE 0x08
// Define SO_REUSEPORT if not found to fix build issues. // Define SO_REUSEPORT if not found to fix build issues.
// See https://github.com/netty/netty/issues/2558 // See https://github.com/netty/netty/issues/2558

View File

@ -35,7 +35,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
private static final ChannelMetadata DATA = new ChannelMetadata(false); private static final ChannelMetadata DATA = new ChannelMetadata(false);
private final int readFlag; private final int readFlag;
private volatile FileDescriptor fileDescriptor; private volatile FileDescriptor fileDescriptor;
protected int flags; protected int flags = Native.EPOLLET;
protected volatile boolean active; protected volatile boolean active;
int id; int id;
@ -51,6 +52,24 @@ abstract class AbstractEpollChannel extends AbstractChannel {
fileDescriptor = new EpollFileDescriptor(fd); fileDescriptor = new EpollFileDescriptor(fd);
} }
void setFlag(int flag) {
if (!isFlagSet(flag)) {
flags |= flag;
modifyEvents();
}
}
void clearFlag(int flag) {
if (isFlagSet(flag)) {
flags &= ~flag;
modifyEvents();
}
}
boolean isFlagSet(int flag) {
return (flags & flag) != 0;
}
/** /**
* Returns the {@link FileDescriptor} that is used by this {@link Channel}. * Returns the {@link FileDescriptor} that is used by this {@link Channel}.
*/ */
@ -58,6 +77,9 @@ abstract class AbstractEpollChannel extends AbstractChannel {
return fileDescriptor; return fileDescriptor;
} }
@Override
public abstract EpollChannelConfig config();
@Override @Override
public boolean isActive() { public boolean isActive() {
return active; return active;
@ -105,10 +127,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
// Channel.read() or ChannelHandlerContext.read() was called // Channel.read() or ChannelHandlerContext.read() was called
((AbstractEpollUnsafe) unsafe()).readPending = true; ((AbstractEpollUnsafe) unsafe()).readPending = true;
if ((flags & readFlag) == 0) { setFlag(readFlag);
flags |= readFlag;
modifyEvents();
}
} }
final void clearEpollIn() { final void clearEpollIn() {
@ -137,22 +156,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
} }
} }
protected final void setEpollOut() {
if ((flags & Native.EPOLLOUT) == 0) {
flags |= Native.EPOLLOUT;
modifyEvents();
}
}
protected final void clearEpollOut() {
if ((flags & Native.EPOLLOUT) != 0) {
flags &= ~Native.EPOLLOUT;
modifyEvents();
}
}
private void modifyEvents() { private void modifyEvents() {
if (isOpen()) { if (isOpen() && isRegistered()) {
((EpollEventLoop) eventLoop()).modify(this); ((EpollEventLoop) eventLoop()).modify(this);
} }
} }
@ -250,7 +255,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
readerIndex += localFlushedAmount; readerIndex += localFlushedAmount;
} else { } else {
// Returned EAGAIN need to set EPOLLOUT // Returned EAGAIN need to set EPOLLOUT
setEpollOut(); setFlag(Native.EPOLLOUT);
return writtenBytes; return writtenBytes;
} }
} }
@ -273,7 +278,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
} }
} else { } else {
// Returned EAGAIN need to set EPOLLOUT // Returned EAGAIN need to set EPOLLOUT
setEpollOut(); setFlag(Native.EPOLLOUT);
break; break;
} }
} }
@ -301,7 +306,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
// Flush immediately only when there's no pending flush. // Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later, // If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now. // and thus there's no need to call it now.
if (isFlushPending()) { if (isFlagSet(Native.EPOLLOUT)) {
return; return;
} }
super.flush0(); super.flush0();
@ -315,15 +320,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
super.flush0(); super.flush0();
} }
private boolean isFlushPending() {
return (flags & Native.EPOLLOUT) != 0;
}
protected final void clearEpollIn0() { protected final void clearEpollIn0() {
if ((flags & readFlag) != 0) { clearFlag(readFlag);
flags &= ~readFlag;
modifyEvents();
}
} }
} }
} }

View File

@ -29,7 +29,7 @@ import java.net.SocketAddress;
public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel { public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel {
protected AbstractEpollServerChannel(int fd) { protected AbstractEpollServerChannel(int fd) {
super(fd, Native.EPOLLACCEPT); super(fd, Native.EPOLLIN);
} }
@Override @Override
@ -74,7 +74,12 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
Throwable exception = null; Throwable exception = null;
try { try {
try { try {
for (;;) { boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
int messages = 0;
do {
int socketFd = Native.accept(fd().intValue()); int socketFd = Native.accept(fd().intValue());
if (socketFd == -1) { if (socketFd == -1) {
// this means everything was handled for now // this means everything was handled for now
@ -88,8 +93,15 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
// keep on reading as we use epoll ET and need to consume everything from the socket // keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
} finally {
if (!edgeTriggered && !config().isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
} }
} }
} while (++ messages < maxMessagesPerRead);
} catch (Throwable t) { } catch (Throwable t) {
exception = t; exception = t;
} }

View File

@ -100,7 +100,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt); long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) { if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT // Returned EAGAIN need to set EPOLLOUT
setEpollOut(); setFlag(Native.EPOLLOUT);
break; break;
} }
expectedWrittenBytes -= localWrittenBytes; expectedWrittenBytes -= localWrittenBytes;
@ -142,7 +142,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt); long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt);
if (localWrittenBytes == 0) { if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT // Returned EAGAIN need to set EPOLLOUT
setEpollOut(); setFlag(Native.EPOLLOUT);
break; break;
} }
expectedWrittenBytes -= localWrittenBytes; expectedWrittenBytes -= localWrittenBytes;
@ -195,7 +195,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset); Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
if (localFlushedAmount == 0) { if (localFlushedAmount == 0) {
// Returned EAGAIN need to set EPOLLOUT // Returned EAGAIN need to set EPOLLOUT
setEpollOut(); setFlag(Native.EPOLLOUT);
break; break;
} }
@ -223,7 +223,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
if (msgCount == 0) { if (msgCount == 0) {
// Wrote all messages. // Wrote all messages.
clearEpollOut(); clearFlag(Native.EPOLLOUT);
break; break;
} }
@ -375,7 +375,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
try { try {
boolean connected = Native.connect(fd().intValue(), remoteAddress); boolean connected = Native.connect(fd().intValue(), remoteAddress);
if (!connected) { if (!connected) {
setEpollOut(); setFlag(Native.EPOLLOUT);
} }
success = true; success = true;
return connected; return connected;
@ -557,10 +557,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
*/ */
private boolean doFinishConnect() throws Exception { private boolean doFinishConnect() throws Exception {
if (Native.finishConnect(fd().intValue())) { if (Native.finishConnect(fd().intValue())) {
clearEpollOut(); clearFlag(Native.EPOLLOUT);
return true; return true;
} else { } else {
setEpollOut(); setFlag(Native.EPOLLOUT);
return false; return false;
} }
} }
@ -587,8 +587,13 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
boolean close = false; boolean close = false;
try { try {
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
int messages = 0;
int totalReadAmount = 0; int totalReadAmount = 0;
for (;;) { do {
// we use a direct buffer here as the native implementations only be able // we use a direct buffer here as the native implementations only be able
// to handle direct buffers. // to handle direct buffers.
byteBuf = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
@ -618,7 +623,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// which might mean we drained the recv buffer completely. // which might mean we drained the recv buffer completely.
break; break;
} }
if (!edgeTriggered && !config().isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
} }
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount); allocHandle.record(totalReadAmount);

View File

@ -0,0 +1,161 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import java.util.Map;
public class EpollChannelConfig extends DefaultChannelConfig {
final AbstractEpollChannel channel;
EpollChannelConfig(AbstractEpollChannel channel) {
super(channel);
this.channel = channel;
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), EpollChannelOption.EPOLL_MODE);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == EpollChannelOption.EPOLL_MODE) {
return (T) getEpollMode();
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == EpollChannelOption.EPOLL_MODE) {
setEpollMode((EpollMode) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public EpollChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public EpollChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public EpollChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public EpollChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
public EpollChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public EpollChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
/**
* Return the {@link EpollMode} used. Default is
* {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
* {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
* {@link EpollMode#LEVEL_TRIGGERED}.
*/
public EpollMode getEpollMode() {
return channel.isFlagSet(Native.EPOLLET)
? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED;
}
/**
* Set the {@link EpollMode} used. Default is
* {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
* {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
* {@link EpollMode#LEVEL_TRIGGERED}.
*
* <strong>Be aware this config setting can only be adjusted before the channel was registered.</strong>
*/
public EpollChannelConfig setEpollMode(EpollMode mode) {
if (mode == null) {
throw new NullPointerException("mode");
}
switch (mode) {
case EDGE_TRIGGERED:
checkChannelNotRegistered();
channel.setFlag(Native.EPOLLET);
break;
case LEVEL_TRIGGERED:
checkChannelNotRegistered();
channel.clearFlag(Native.EPOLLET);
break;
default:
throw new Error();
}
return this;
}
private void checkChannelNotRegistered() {
if (channel.isRegistered()) {
throw new IllegalStateException("EpollMode can only be changed before channel is registered");
}
}
@Override
protected final void autoReadCleared() {
channel.clearEpollIn();
}
}

View File

@ -29,6 +29,9 @@ public final class EpollChannelOption<T> extends ChannelOption<T> {
public static final ChannelOption<DomainSocketReadMode> DOMAIN_SOCKET_READ_MODE = public static final ChannelOption<DomainSocketReadMode> DOMAIN_SOCKET_READ_MODE =
valueOf("DOMAIN_SOCKET_READ_MODE"); valueOf("DOMAIN_SOCKET_READ_MODE");
public static final ChannelOption<EpollMode> EPOLL_MODE =
valueOf("EPOLL_MODE");
@SuppressWarnings({ "unused", "deprecation" }) @SuppressWarnings({ "unused", "deprecation" })
private EpollChannelOption(String name) { private EpollChannelOption(String name) {
super(name); super(name);

View File

@ -25,6 +25,7 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope; import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.FileDescriptor;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramChannelConfig;
@ -84,7 +85,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override @Override
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public boolean isActive() { public boolean isActive() {
return fd() != EpollFileDescriptor.INVALID && return fd() != FileDescriptor.INVALID &&
(config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered() (config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
|| active); || active);
} }
@ -274,7 +275,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
Object msg = in.current(); Object msg = in.current();
if (msg == null) { if (msg == null) {
// Wrote all messages. // Wrote all messages.
clearEpollOut(); clearFlag(Native.EPOLLOUT);
break; break;
} }
@ -293,7 +294,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt); int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
if (send == 0) { if (send == 0) {
// Did not write all messages. // Did not write all messages.
setEpollOut(); setFlag(Native.EPOLLOUT);
return; return;
} }
for (int i = 0; i < send; i++) { for (int i = 0; i < send; i++) {
@ -317,7 +318,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
in.remove(); in.remove();
} else { } else {
// Did not write all messages. // Did not write all messages.
setEpollOut(); setFlag(Native.EPOLLOUT);
break; break;
} }
} catch (IOException e) { } catch (IOException e) {
@ -505,7 +506,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
Throwable exception = null; Throwable exception = null;
try { try {
for (;;) { boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
int messages = 0;
do {
ByteBuf data = null; ByteBuf data = null;
try { try {
data = allocHandle.allocate(config.getAllocator()); data = allocHandle.allocate(config.getAllocator());
@ -540,8 +546,14 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (data != null) { if (data != null) {
data.release(); data.release();
} }
if (!edgeTriggered && !config().isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
} }
} }
} while (++ messages < maxMessagesPerRead);
int size = readBuf.size(); int size = readBuf.size();
for (int i = 0; i < size; i ++) { for (int i = 0; i < size; i ++) {

View File

@ -17,7 +17,6 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.MessageSizeEstimator; import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
@ -27,7 +26,7 @@ import java.net.InetAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.util.Map; import java.util.Map;
public final class EpollDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig { public final class EpollDatagramChannelConfig extends EpollChannelConfig implements DatagramChannelConfig {
private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048); private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048);
private final EpollDatagramChannel datagramChannel; private final EpollDatagramChannel datagramChannel;
private boolean activeOnOpen; private boolean activeOnOpen;
@ -285,6 +284,12 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
throw new UnsupportedOperationException("Multicast not supported"); throw new UnsupportedOperationException("Multicast not supported");
} }
@Override
public EpollDatagramChannelConfig setEpollMode(EpollMode mode) {
super.setEpollMode(mode);
return this;
}
/** /**
* Returns {@code true} if the SO_REUSEPORT option is set. * Returns {@code true} if the SO_REUSEPORT option is set.
*/ */
@ -303,9 +308,4 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
Native.setReusePort(datagramChannel.fd().intValue(), reusePort ? 1 : 0); Native.setReusePort(datagramChannel.fd().intValue(), reusePort ? 1 : 0);
return this; return this;
} }
@Override
protected void autoReadCleared() {
datagramChannel.clearEpollIn();
}
} }

View File

@ -126,7 +126,12 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
try { try {
for (;;) { boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
int messages = 0;
do {
int socketFd = Native.recvFd(fd().intValue()); int socketFd = Native.recvFd(fd().intValue());
if (socketFd == 0) { if (socketFd == 0) {
break; break;
@ -136,8 +141,23 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
return; return;
} }
readPending = false; readPending = false;
try {
pipeline.fireChannelRead(new EpollFileDescriptor(socketFd)); pipeline.fireChannelRead(new EpollFileDescriptor(socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
} finally {
if (!edgeTriggered && !config().isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
} }
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -18,18 +18,17 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageSizeEstimator; import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import java.util.Map; import java.util.Map;
public final class EpollDomainSocketChannelConfig extends DefaultChannelConfig { public final class EpollDomainSocketChannelConfig extends EpollChannelConfig {
private volatile DomainSocketReadMode mode = private volatile DomainSocketReadMode mode =
DomainSocketReadMode.BYTES; DomainSocketReadMode.BYTES;
EpollDomainSocketChannelConfig(Channel channel) { EpollDomainSocketChannelConfig(AbstractEpollChannel channel) {
super(channel); super(channel);
} }
@ -119,6 +118,12 @@ public final class EpollDomainSocketChannelConfig extends DefaultChannelConfig {
return this; return this;
} }
@Override
public EpollDomainSocketChannelConfig setEpollMode(EpollMode mode) {
super.setEpollMode(mode);
return this;
}
/** /**
* Change the {@link DomainSocketReadMode} for the channel. The default is * Change the {@link DomainSocketReadMode} for the channel. The default is
* {@link DomainSocketReadMode#BYTES} which means bytes will be read from the * {@link DomainSocketReadMode#BYTES} which means bytes will be read from the

View File

@ -321,22 +321,18 @@ final class EpollEventLoop extends SingleThreadEventLoop {
// consume wakeup event // consume wakeup event
Native.eventFdRead(eventFd); Native.eventFdRead(eventFd);
} else { } else {
boolean read = (ev & Native.EPOLLIN) != 0;
boolean write = (ev & Native.EPOLLOUT) != 0;
boolean close = (ev & Native.EPOLLRDHUP) != 0;
AbstractEpollChannel ch = ids.get(id); AbstractEpollChannel ch = ids.get(id);
if (ch != null) { if (ch != null) {
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe(); AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
if (write && ch.isOpen()) { if ((ev & Native.EPOLLOUT) != 0 && ch.isOpen()) {
// force flush of data as the epoll is writable again // force flush of data as the epoll is writable again
unsafe.epollOutReady(); unsafe.epollOutReady();
} }
if (read && ch.isOpen()) { if ((ev & Native.EPOLLIN) != 0 && ch.isOpen()) {
// Something is ready to read, so consume it now // Something is ready to read, so consume it now
unsafe.epollInReady(); unsafe.epollInReady();
} }
if (close && ch.isOpen()) { if ((ev & Native.EPOLLRDHUP) != 0 && ch.isOpen()) {
unsafe.epollRdHupReady(); unsafe.epollRdHupReady();
} }
} }

View File

@ -0,0 +1,36 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
/**
* The <a href="http://linux.die.net/man/7/epoll">epoll</a> mode to use.
*/
public enum EpollMode {
/**
* Use {@code EPOLLET} (edge-triggered).
*
* @see <a href="http://linux.die.net/man/7/epoll">man 7 epoll</a>.
*/
EDGE_TRIGGERED,
/**
* Do not use {@code EPOLLET} (level-triggered).
*
* @see <a href="http://linux.die.net/man/7/epoll">man 7 epoll</a>.
*/
LEVEL_TRIGGERED
}

View File

@ -17,7 +17,6 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator; import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
@ -28,7 +27,7 @@ import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_RCVBUF; import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR; import static io.netty.channel.ChannelOption.SO_REUSEADDR;
public class EpollServerChannelConfig extends DefaultChannelConfig { public class EpollServerChannelConfig extends EpollChannelConfig {
protected final AbstractEpollChannel channel; protected final AbstractEpollChannel channel;
private volatile int backlog = NetUtil.SOMAXCONN; private volatile int backlog = NetUtil.SOMAXCONN;
@ -159,7 +158,8 @@ public class EpollServerChannelConfig extends DefaultChannelConfig {
} }
@Override @Override
protected final void autoReadCleared() { public EpollServerChannelConfig setEpollMode(EpollMode mode) {
channel.clearEpollIn(); super.setEpollMode(mode);
return this;
} }
} }

View File

@ -17,7 +17,6 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator; import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
@ -27,7 +26,7 @@ import java.util.Map;
import static io.netty.channel.ChannelOption.*; import static io.netty.channel.ChannelOption.*;
public final class EpollSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { public final class EpollSocketChannelConfig extends EpollChannelConfig implements SocketChannelConfig {
private final EpollSocketChannel channel; private final EpollSocketChannel channel;
private volatile boolean allowHalfClosure; private volatile boolean allowHalfClosure;
@ -345,7 +344,8 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig impleme
} }
@Override @Override
protected void autoReadCleared() { public EpollSocketChannelConfig setEpollMode(EpollMode mode) {
channel.clearEpollIn(); super.setEpollMode(mode);
return this;
} }
} }

View File

@ -51,8 +51,9 @@ final class Native {
// EventLoop operations and constants // EventLoop operations and constants
public static final int EPOLLIN = 0x01; public static final int EPOLLIN = 0x01;
public static final int EPOLLOUT = 0x02; public static final int EPOLLOUT = 0x02;
public static final int EPOLLACCEPT = 0x04; public static final int EPOLLRDHUP = 0x04;
public static final int EPOLLRDHUP = 0x08; public static final int EPOLLET = 0x08;
public static final int IOV_MAX = iovMax(); public static final int IOV_MAX = iovMax();
public static final int UIO_MAX_IOV = uioMaxIov(); public static final int UIO_MAX_IOV = uioMaxIov();
public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg(); public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();