Allow to change epoll mode

Motivation:
Netty uses edge-triggered epoll by default for performance reasons. The downside here is that a messagesPerRead limit can not be enforced correctly, as we need to consume everything from the channel when notified.

Modification:
- Allow to switch epoll modes before channel is registered
- Some refactoring to share more code

Result:
It's now possible to switch epoll mode.
This commit is contained in:
Norman Maurer 2015-01-29 06:49:14 +01:00
parent a833fbe9b9
commit d1a60e682a
16 changed files with 338 additions and 87 deletions

View File

@ -105,11 +105,8 @@ char* exceptionMessage(char* msg, int error) {
}
jint epollCtl(JNIEnv* env, jint efd, int op, jint fd, jint flags, jint id) {
uint32_t events = EPOLLET;
uint32_t events = (flags & EPOLL_EDGE) ? EPOLLET : 0;
if (flags & EPOLL_ACCEPT) {
events |= EPOLLIN;
}
if (flags & EPOLL_READ) {
events |= EPOLLIN | EPOLLRDHUP;
}

View File

@ -18,8 +18,9 @@
#define EPOLL_READ 0x01
#define EPOLL_WRITE 0x02
#define EPOLL_ACCEPT 0x04
#define EPOLL_RDHUP 0x08
#define EPOLL_RDHUP 0x04
#define EPOLL_EDGE 0x08
// Define SO_REUSEPORT if not found to fix build issues.
// See https://github.com/netty/netty/issues/2558

View File

@ -35,7 +35,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
private static final ChannelMetadata DATA = new ChannelMetadata(false);
private final int readFlag;
private volatile FileDescriptor fileDescriptor;
protected int flags;
protected int flags = Native.EPOLLET;
protected volatile boolean active;
int id;
@ -51,6 +52,24 @@ abstract class AbstractEpollChannel extends AbstractChannel {
fileDescriptor = new EpollFileDescriptor(fd);
}
void setFlag(int flag) {
if (!isFlagSet(flag)) {
flags |= flag;
modifyEvents();
}
}
void clearFlag(int flag) {
if (isFlagSet(flag)) {
flags &= ~flag;
modifyEvents();
}
}
boolean isFlagSet(int flag) {
return (flags & flag) != 0;
}
/**
* Returns the {@link FileDescriptor} that is used by this {@link Channel}.
*/
@ -58,6 +77,9 @@ abstract class AbstractEpollChannel extends AbstractChannel {
return fileDescriptor;
}
@Override
public abstract EpollChannelConfig config();
@Override
public boolean isActive() {
return active;
@ -105,10 +127,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
// Channel.read() or ChannelHandlerContext.read() was called
((AbstractEpollUnsafe) unsafe()).readPending = true;
if ((flags & readFlag) == 0) {
flags |= readFlag;
modifyEvents();
}
setFlag(readFlag);
}
final void clearEpollIn() {
@ -137,22 +156,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
}
}
protected final void setEpollOut() {
if ((flags & Native.EPOLLOUT) == 0) {
flags |= Native.EPOLLOUT;
modifyEvents();
}
}
protected final void clearEpollOut() {
if ((flags & Native.EPOLLOUT) != 0) {
flags &= ~Native.EPOLLOUT;
modifyEvents();
}
}
private void modifyEvents() {
if (isOpen()) {
if (isOpen() && isRegistered()) {
((EpollEventLoop) eventLoop().unwrap()).modify(this);
}
}
@ -249,7 +254,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
readerIndex += localFlushedAmount;
} else {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
setFlag(Native.EPOLLOUT);
return writtenBytes;
}
}
@ -272,7 +277,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
}
} else {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
setFlag(Native.EPOLLOUT);
break;
}
}
@ -300,7 +305,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (isFlushPending()) {
if (isFlagSet(Native.EPOLLOUT)) {
return;
}
super.flush0();
@ -314,15 +319,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
super.flush0();
}
private boolean isFlushPending() {
return (flags & Native.EPOLLOUT) != 0;
}
protected final void clearEpollIn0() {
if ((flags & readFlag) != 0) {
flags &= ~readFlag;
modifyEvents();
}
clearFlag(readFlag);
}
}
}

View File

@ -29,7 +29,7 @@ import java.net.SocketAddress;
public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel {
protected AbstractEpollServerChannel(int fd) {
super(fd, Native.EPOLLACCEPT);
super(fd, Native.EPOLLIN);
}
@Override
@ -74,7 +74,12 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
Throwable exception = null;
try {
try {
for (;;) {
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
int messages = 0;
do {
int socketFd = Native.accept(fd().intValue());
if (socketFd == -1) {
// this means everything was handled for now
@ -88,8 +93,15 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
} finally {
if (!edgeTriggered && !config().isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
}
}
}
} while (++ messages < maxMessagesPerRead);
} catch (Throwable t) {
exception = t;
}

View File

@ -100,7 +100,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
setFlag(Native.EPOLLOUT);
break;
}
expectedWrittenBytes -= localWrittenBytes;
@ -142,7 +142,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
setFlag(Native.EPOLLOUT);
break;
}
expectedWrittenBytes -= localWrittenBytes;
@ -195,7 +195,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
if (localFlushedAmount == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
setFlag(Native.EPOLLOUT);
break;
}
@ -223,7 +223,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
if (msgCount == 0) {
// Wrote all messages.
clearEpollOut();
clearFlag(Native.EPOLLOUT);
break;
}
@ -375,7 +375,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
try {
boolean connected = Native.connect(fd().intValue(), remoteAddress);
if (!connected) {
setEpollOut();
setFlag(Native.EPOLLOUT);
}
success = true;
return connected;
@ -557,10 +557,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
*/
private boolean doFinishConnect() throws Exception {
if (Native.finishConnect(fd().intValue())) {
clearEpollOut();
clearFlag(Native.EPOLLOUT);
return true;
} else {
setEpollOut();
setFlag(Native.EPOLLOUT);
return false;
}
}
@ -587,8 +587,13 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
ByteBuf byteBuf = null;
boolean close = false;
try {
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
int messages = 0;
int totalReadAmount = 0;
for (;;) {
do {
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
@ -618,7 +623,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// which might mean we drained the recv buffer completely.
break;
}
}
if (!edgeTriggered && !config().isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);

View File

@ -0,0 +1,161 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import java.util.Map;
public class EpollChannelConfig extends DefaultChannelConfig {
final AbstractEpollChannel channel;
EpollChannelConfig(AbstractEpollChannel channel) {
super(channel);
this.channel = channel;
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), EpollChannelOption.EPOLL_MODE);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == EpollChannelOption.EPOLL_MODE) {
return (T) getEpollMode();
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == EpollChannelOption.EPOLL_MODE) {
setEpollMode((EpollMode) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public EpollChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public EpollChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public EpollChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public EpollChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
public EpollChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public EpollChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
/**
* Return the {@link EpollMode} used. Default is
* {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
* {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
* {@link EpollMode#LEVEL_TRIGGERED}.
*/
public EpollMode getEpollMode() {
return channel.isFlagSet(Native.EPOLLET)
? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED;
}
/**
* Set the {@link EpollMode} used. Default is
* {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
* {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
* {@link EpollMode#LEVEL_TRIGGERED}.
*
* <strong>Be aware this config setting can only be adjusted before the channel was registered.</strong>
*/
public EpollChannelConfig setEpollMode(EpollMode mode) {
if (mode == null) {
throw new NullPointerException("mode");
}
switch (mode) {
case EDGE_TRIGGERED:
checkChannelNotRegistered();
channel.setFlag(Native.EPOLLET);
break;
case LEVEL_TRIGGERED:
checkChannelNotRegistered();
channel.clearFlag(Native.EPOLLET);
break;
default:
throw new Error();
}
return this;
}
private void checkChannelNotRegistered() {
if (channel.isRegistered()) {
throw new IllegalStateException("EpollMode can only be changed before channel is registered");
}
}
@Override
protected final void autoReadCleared() {
channel.clearEpollIn();
}
}

View File

@ -27,7 +27,7 @@ public final class EpollChannelOption {
public static final ChannelOption<Integer> TCP_KEEPCNT = ChannelOption.valueOf(T, "TCP_KEEPCNT");
public static final ChannelOption<DomainSocketReadMode> DOMAIN_SOCKET_READ_MODE =
ChannelOption.valueOf(T, "DOMAIN_SOCKET_READ_MODE");
public static final ChannelOption<EpollMode> EPOLL_MODE =
ChannelOption.valueOf(T, "EPOLL_MODE");
private EpollChannelOption() { }
}

View File

@ -25,6 +25,7 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.FileDescriptor;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
@ -84,7 +85,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override
@SuppressWarnings("deprecation")
public boolean isActive() {
return fd() != EpollFileDescriptor.INVALID &&
return fd() != FileDescriptor.INVALID &&
(config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
|| active);
}
@ -274,7 +275,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearEpollOut();
clearFlag(Native.EPOLLOUT);
break;
}
@ -293,7 +294,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
if (send == 0) {
// Did not write all messages.
setEpollOut();
setFlag(Native.EPOLLOUT);
return;
}
for (int i = 0; i < send; i++) {
@ -317,7 +318,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
in.remove();
} else {
// Did not write all messages.
setEpollOut();
setFlag(Native.EPOLLOUT);
break;
}
} catch (IOException e) {
@ -501,7 +502,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try {
for (;;) {
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
int messages = 0;
do {
ByteBuf data = null;
try {
data = allocHandle.allocate(config.getAllocator());
@ -536,8 +542,14 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (data != null) {
data.release();
}
if (!edgeTriggered && !config().isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
}
}
}
} while (++ messages < maxMessagesPerRead);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {

View File

@ -17,7 +17,6 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
@ -27,7 +26,7 @@ import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Map;
public final class EpollDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig {
public final class EpollDatagramChannelConfig extends EpollChannelConfig implements DatagramChannelConfig {
private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048);
private final EpollDatagramChannel datagramChannel;
private boolean activeOnOpen;
@ -279,6 +278,12 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
throw new UnsupportedOperationException("Multicast not supported");
}
@Override
public EpollDatagramChannelConfig setEpollMode(EpollMode mode) {
super.setEpollMode(mode);
return this;
}
/**
* Returns {@code true} if the SO_REUSEPORT option is set.
*/
@ -297,9 +302,4 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
Native.setReusePort(datagramChannel.fd().intValue(), reusePort ? 1 : 0);
return this;
}
@Override
protected void autoReadCleared() {
datagramChannel.clearEpollIn();
}
}

View File

@ -126,7 +126,12 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
final ChannelPipeline pipeline = pipeline();
try {
for (;;) {
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
final int maxMessagesPerRead = edgeTriggered
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
int messages = 0;
do {
int socketFd = Native.recvFd(fd().intValue());
if (socketFd == 0) {
break;
@ -136,8 +141,23 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
return;
}
readPending = false;
pipeline.fireChannelRead(new EpollFileDescriptor(socketFd));
}
try {
pipeline.fireChannelRead(new EpollFileDescriptor(socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
} finally {
if (!edgeTriggered && !config().isAutoRead()) {
// This is not using EPOLLET so we can stop reading
// ASAP as we will get notified again later with
// pending data
break;
}
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
} catch (Throwable t) {

View File

@ -18,18 +18,17 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import java.util.Map;
public final class EpollDomainSocketChannelConfig extends DefaultChannelConfig {
public final class EpollDomainSocketChannelConfig extends EpollChannelConfig {
private volatile DomainSocketReadMode mode =
DomainSocketReadMode.BYTES;
EpollDomainSocketChannelConfig(Channel channel) {
EpollDomainSocketChannelConfig(AbstractEpollChannel channel) {
super(channel);
}
@ -113,6 +112,12 @@ public final class EpollDomainSocketChannelConfig extends DefaultChannelConfig {
return this;
}
@Override
public EpollDomainSocketChannelConfig setEpollMode(EpollMode mode) {
super.setEpollMode(mode);
return this;
}
/**
* Change the {@link DomainSocketReadMode} for the channel. The default is
* {@link DomainSocketReadMode#BYTES} which means bytes will be read from the

View File

@ -323,22 +323,18 @@ final class EpollEventLoop extends SingleThreadEventLoop {
// consume wakeup event
Native.eventFdRead(eventFd);
} else {
boolean read = (ev & Native.EPOLLIN) != 0;
boolean write = (ev & Native.EPOLLOUT) != 0;
boolean close = (ev & Native.EPOLLRDHUP) != 0;
AbstractEpollChannel ch = ids.get(id);
if (ch != null) {
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
if (write && ch.isOpen()) {
if ((ev & Native.EPOLLOUT) != 0 && ch.isOpen()) {
// force flush of data as the epoll is writable again
unsafe.epollOutReady();
}
if (read && ch.isOpen()) {
if ((ev & Native.EPOLLIN) != 0 && ch.isOpen()) {
// Something is ready to read, so consume it now
unsafe.epollInReady();
}
if (close && ch.isOpen()) {
if ((ev & Native.EPOLLRDHUP) != 0 && ch.isOpen()) {
unsafe.epollRdHupReady();
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
/**
* The <a href="http://linux.die.net/man/7/epoll">epoll</a> mode to use.
*/
public enum EpollMode {
/**
* Use {@code EPOLLET} (edge-triggered).
*
* @see <a href="http://linux.die.net/man/7/epoll">man 7 epoll</a>.
*/
EDGE_TRIGGERED,
/**
* Do not use {@code EPOLLET} (level-triggered).
*
* @see <a href="http://linux.die.net/man/7/epoll">man 7 epoll</a>.
*/
LEVEL_TRIGGERED
}

View File

@ -17,7 +17,6 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.NetUtil;
@ -28,7 +27,7 @@ import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
public class EpollServerChannelConfig extends DefaultChannelConfig {
public class EpollServerChannelConfig extends EpollChannelConfig {
protected final AbstractEpollChannel channel;
private volatile int backlog = NetUtil.SOMAXCONN;
@ -159,7 +158,8 @@ public class EpollServerChannelConfig extends DefaultChannelConfig {
}
@Override
protected final void autoReadCleared() {
channel.clearEpollIn();
public EpollServerChannelConfig setEpollMode(EpollMode mode) {
super.setEpollMode(mode);
return this;
}
}

View File

@ -17,7 +17,6 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.SocketChannelConfig;
@ -27,7 +26,7 @@ import java.util.Map;
import static io.netty.channel.ChannelOption.*;
public final class EpollSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {
public final class EpollSocketChannelConfig extends EpollChannelConfig implements SocketChannelConfig {
private final EpollSocketChannel channel;
private volatile boolean allowHalfClosure;
@ -339,7 +338,8 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig impleme
}
@Override
protected void autoReadCleared() {
channel.clearEpollIn();
public EpollSocketChannelConfig setEpollMode(EpollMode mode) {
super.setEpollMode(mode);
return this;
}
}

View File

@ -51,8 +51,9 @@ final class Native {
// EventLoop operations and constants
public static final int EPOLLIN = 0x01;
public static final int EPOLLOUT = 0x02;
public static final int EPOLLACCEPT = 0x04;
public static final int EPOLLRDHUP = 0x08;
public static final int EPOLLRDHUP = 0x04;
public static final int EPOLLET = 0x08;
public static final int IOV_MAX = iovMax();
public static final int UIO_MAX_IOV = uioMaxIov();
public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();