Allow to change epoll mode
Motivation: Netty uses edge-triggered epoll by default for performance reasons. The downside here is that a messagesPerRead limit can not be enforced correctly, as we need to consume everything from the channel when notified. Modification: - Allow to switch epoll modes before channel is registered - Some refactoring to share more code Result: It's now possible to switch epoll mode.
This commit is contained in:
parent
bc76bfa199
commit
b984ca7979
@ -105,11 +105,8 @@ char* exceptionMessage(char* msg, int error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
jint epollCtl(JNIEnv* env, jint efd, int op, jint fd, jint flags, jint id) {
|
jint epollCtl(JNIEnv* env, jint efd, int op, jint fd, jint flags, jint id) {
|
||||||
uint32_t events = EPOLLET;
|
uint32_t events = (flags & EPOLL_EDGE) ? EPOLLET : 0;
|
||||||
|
|
||||||
if (flags & EPOLL_ACCEPT) {
|
|
||||||
events |= EPOLLIN;
|
|
||||||
}
|
|
||||||
if (flags & EPOLL_READ) {
|
if (flags & EPOLL_READ) {
|
||||||
events |= EPOLLIN | EPOLLRDHUP;
|
events |= EPOLLIN | EPOLLRDHUP;
|
||||||
}
|
}
|
||||||
|
@ -18,8 +18,9 @@
|
|||||||
|
|
||||||
#define EPOLL_READ 0x01
|
#define EPOLL_READ 0x01
|
||||||
#define EPOLL_WRITE 0x02
|
#define EPOLL_WRITE 0x02
|
||||||
#define EPOLL_ACCEPT 0x04
|
#define EPOLL_RDHUP 0x04
|
||||||
#define EPOLL_RDHUP 0x08
|
#define EPOLL_EDGE 0x08
|
||||||
|
|
||||||
|
|
||||||
// Define SO_REUSEPORT if not found to fix build issues.
|
// Define SO_REUSEPORT if not found to fix build issues.
|
||||||
// See https://github.com/netty/netty/issues/2558
|
// See https://github.com/netty/netty/issues/2558
|
||||||
|
@ -35,7 +35,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
private static final ChannelMetadata DATA = new ChannelMetadata(false);
|
private static final ChannelMetadata DATA = new ChannelMetadata(false);
|
||||||
private final int readFlag;
|
private final int readFlag;
|
||||||
private volatile FileDescriptor fileDescriptor;
|
private volatile FileDescriptor fileDescriptor;
|
||||||
protected int flags;
|
protected int flags = Native.EPOLLET;
|
||||||
|
|
||||||
protected volatile boolean active;
|
protected volatile boolean active;
|
||||||
int id;
|
int id;
|
||||||
|
|
||||||
@ -51,6 +52,24 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
fileDescriptor = new EpollFileDescriptor(fd);
|
fileDescriptor = new EpollFileDescriptor(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setFlag(int flag) {
|
||||||
|
if (!isFlagSet(flag)) {
|
||||||
|
flags |= flag;
|
||||||
|
modifyEvents();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void clearFlag(int flag) {
|
||||||
|
if (isFlagSet(flag)) {
|
||||||
|
flags &= ~flag;
|
||||||
|
modifyEvents();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isFlagSet(int flag) {
|
||||||
|
return (flags & flag) != 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the {@link FileDescriptor} that is used by this {@link Channel}.
|
* Returns the {@link FileDescriptor} that is used by this {@link Channel}.
|
||||||
*/
|
*/
|
||||||
@ -58,6 +77,9 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
return fileDescriptor;
|
return fileDescriptor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public abstract EpollChannelConfig config();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isActive() {
|
public boolean isActive() {
|
||||||
return active;
|
return active;
|
||||||
@ -105,10 +127,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
// Channel.read() or ChannelHandlerContext.read() was called
|
// Channel.read() or ChannelHandlerContext.read() was called
|
||||||
((AbstractEpollUnsafe) unsafe()).readPending = true;
|
((AbstractEpollUnsafe) unsafe()).readPending = true;
|
||||||
|
|
||||||
if ((flags & readFlag) == 0) {
|
setFlag(readFlag);
|
||||||
flags |= readFlag;
|
|
||||||
modifyEvents();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final void clearEpollIn() {
|
final void clearEpollIn() {
|
||||||
@ -137,22 +156,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final void setEpollOut() {
|
|
||||||
if ((flags & Native.EPOLLOUT) == 0) {
|
|
||||||
flags |= Native.EPOLLOUT;
|
|
||||||
modifyEvents();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected final void clearEpollOut() {
|
|
||||||
if ((flags & Native.EPOLLOUT) != 0) {
|
|
||||||
flags &= ~Native.EPOLLOUT;
|
|
||||||
modifyEvents();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void modifyEvents() {
|
private void modifyEvents() {
|
||||||
if (isOpen()) {
|
if (isOpen() && isRegistered()) {
|
||||||
((EpollEventLoop) eventLoop()).modify(this);
|
((EpollEventLoop) eventLoop()).modify(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -250,7 +255,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
readerIndex += localFlushedAmount;
|
readerIndex += localFlushedAmount;
|
||||||
} else {
|
} else {
|
||||||
// Returned EAGAIN need to set EPOLLOUT
|
// Returned EAGAIN need to set EPOLLOUT
|
||||||
setEpollOut();
|
setFlag(Native.EPOLLOUT);
|
||||||
return writtenBytes;
|
return writtenBytes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -273,7 +278,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Returned EAGAIN need to set EPOLLOUT
|
// Returned EAGAIN need to set EPOLLOUT
|
||||||
setEpollOut();
|
setFlag(Native.EPOLLOUT);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -301,7 +306,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
// Flush immediately only when there's no pending flush.
|
// Flush immediately only when there's no pending flush.
|
||||||
// If there's a pending flush operation, event loop will call forceFlush() later,
|
// If there's a pending flush operation, event loop will call forceFlush() later,
|
||||||
// and thus there's no need to call it now.
|
// and thus there's no need to call it now.
|
||||||
if (isFlushPending()) {
|
if (isFlagSet(Native.EPOLLOUT)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
super.flush0();
|
super.flush0();
|
||||||
@ -315,15 +320,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
super.flush0();
|
super.flush0();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isFlushPending() {
|
|
||||||
return (flags & Native.EPOLLOUT) != 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected final void clearEpollIn0() {
|
protected final void clearEpollIn0() {
|
||||||
if ((flags & readFlag) != 0) {
|
clearFlag(readFlag);
|
||||||
flags &= ~readFlag;
|
|
||||||
modifyEvents();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ import java.net.SocketAddress;
|
|||||||
public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel {
|
public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel {
|
||||||
|
|
||||||
protected AbstractEpollServerChannel(int fd) {
|
protected AbstractEpollServerChannel(int fd) {
|
||||||
super(fd, Native.EPOLLACCEPT);
|
super(fd, Native.EPOLLIN);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -74,7 +74,12 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
|
|||||||
Throwable exception = null;
|
Throwable exception = null;
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
for (;;) {
|
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
||||||
|
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
|
||||||
|
final int maxMessagesPerRead = edgeTriggered
|
||||||
|
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
|
||||||
|
int messages = 0;
|
||||||
|
do {
|
||||||
int socketFd = Native.accept(fd().intValue());
|
int socketFd = Native.accept(fd().intValue());
|
||||||
if (socketFd == -1) {
|
if (socketFd == -1) {
|
||||||
// this means everything was handled for now
|
// this means everything was handled for now
|
||||||
@ -88,8 +93,15 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
|
|||||||
// keep on reading as we use epoll ET and need to consume everything from the socket
|
// keep on reading as we use epoll ET and need to consume everything from the socket
|
||||||
pipeline.fireChannelReadComplete();
|
pipeline.fireChannelReadComplete();
|
||||||
pipeline.fireExceptionCaught(t);
|
pipeline.fireExceptionCaught(t);
|
||||||
|
} finally {
|
||||||
|
if (!edgeTriggered && !config().isAutoRead()) {
|
||||||
|
// This is not using EPOLLET so we can stop reading
|
||||||
|
// ASAP as we will get notified again later with
|
||||||
|
// pending data
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
} while (++ messages < maxMessagesPerRead);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
exception = t;
|
exception = t;
|
||||||
}
|
}
|
||||||
|
@ -100,7 +100,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt);
|
long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt);
|
||||||
if (localWrittenBytes == 0) {
|
if (localWrittenBytes == 0) {
|
||||||
// Returned EAGAIN need to set EPOLLOUT
|
// Returned EAGAIN need to set EPOLLOUT
|
||||||
setEpollOut();
|
setFlag(Native.EPOLLOUT);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
expectedWrittenBytes -= localWrittenBytes;
|
expectedWrittenBytes -= localWrittenBytes;
|
||||||
@ -142,7 +142,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt);
|
long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt);
|
||||||
if (localWrittenBytes == 0) {
|
if (localWrittenBytes == 0) {
|
||||||
// Returned EAGAIN need to set EPOLLOUT
|
// Returned EAGAIN need to set EPOLLOUT
|
||||||
setEpollOut();
|
setFlag(Native.EPOLLOUT);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
expectedWrittenBytes -= localWrittenBytes;
|
expectedWrittenBytes -= localWrittenBytes;
|
||||||
@ -195,7 +195,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
|
Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
|
||||||
if (localFlushedAmount == 0) {
|
if (localFlushedAmount == 0) {
|
||||||
// Returned EAGAIN need to set EPOLLOUT
|
// Returned EAGAIN need to set EPOLLOUT
|
||||||
setEpollOut();
|
setFlag(Native.EPOLLOUT);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,7 +223,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
|
|
||||||
if (msgCount == 0) {
|
if (msgCount == 0) {
|
||||||
// Wrote all messages.
|
// Wrote all messages.
|
||||||
clearEpollOut();
|
clearFlag(Native.EPOLLOUT);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -375,7 +375,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
try {
|
try {
|
||||||
boolean connected = Native.connect(fd().intValue(), remoteAddress);
|
boolean connected = Native.connect(fd().intValue(), remoteAddress);
|
||||||
if (!connected) {
|
if (!connected) {
|
||||||
setEpollOut();
|
setFlag(Native.EPOLLOUT);
|
||||||
}
|
}
|
||||||
success = true;
|
success = true;
|
||||||
return connected;
|
return connected;
|
||||||
@ -557,10 +557,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
*/
|
*/
|
||||||
private boolean doFinishConnect() throws Exception {
|
private boolean doFinishConnect() throws Exception {
|
||||||
if (Native.finishConnect(fd().intValue())) {
|
if (Native.finishConnect(fd().intValue())) {
|
||||||
clearEpollOut();
|
clearFlag(Native.EPOLLOUT);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
setEpollOut();
|
setFlag(Native.EPOLLOUT);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -587,8 +587,13 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
ByteBuf byteBuf = null;
|
ByteBuf byteBuf = null;
|
||||||
boolean close = false;
|
boolean close = false;
|
||||||
try {
|
try {
|
||||||
|
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
||||||
|
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
|
||||||
|
final int maxMessagesPerRead = edgeTriggered
|
||||||
|
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
|
||||||
|
int messages = 0;
|
||||||
int totalReadAmount = 0;
|
int totalReadAmount = 0;
|
||||||
for (;;) {
|
do {
|
||||||
// we use a direct buffer here as the native implementations only be able
|
// we use a direct buffer here as the native implementations only be able
|
||||||
// to handle direct buffers.
|
// to handle direct buffers.
|
||||||
byteBuf = allocHandle.allocate(allocator);
|
byteBuf = allocHandle.allocate(allocator);
|
||||||
@ -618,7 +623,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
// which might mean we drained the recv buffer completely.
|
// which might mean we drained the recv buffer completely.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
if (!edgeTriggered && !config().isAutoRead()) {
|
||||||
|
// This is not using EPOLLET so we can stop reading
|
||||||
|
// ASAP as we will get notified again later with
|
||||||
|
// pending data
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} while (++ messages < maxMessagesPerRead);
|
||||||
|
|
||||||
pipeline.fireChannelReadComplete();
|
pipeline.fireChannelReadComplete();
|
||||||
allocHandle.record(totalReadAmount);
|
allocHandle.record(totalReadAmount);
|
||||||
|
|
||||||
|
@ -0,0 +1,161 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2015 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel.epoll;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.DefaultChannelConfig;
|
||||||
|
import io.netty.channel.MessageSizeEstimator;
|
||||||
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class EpollChannelConfig extends DefaultChannelConfig {
|
||||||
|
final AbstractEpollChannel channel;
|
||||||
|
|
||||||
|
EpollChannelConfig(AbstractEpollChannel channel) {
|
||||||
|
super(channel);
|
||||||
|
this.channel = channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<ChannelOption<?>, Object> getOptions() {
|
||||||
|
return getOptions(super.getOptions(), EpollChannelOption.EPOLL_MODE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
public <T> T getOption(ChannelOption<T> option) {
|
||||||
|
if (option == EpollChannelOption.EPOLL_MODE) {
|
||||||
|
return (T) getEpollMode();
|
||||||
|
}
|
||||||
|
return super.getOption(option);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> boolean setOption(ChannelOption<T> option, T value) {
|
||||||
|
validate(option, value);
|
||||||
|
if (option == EpollChannelOption.EPOLL_MODE) {
|
||||||
|
setEpollMode((EpollMode) value);
|
||||||
|
} else {
|
||||||
|
return super.setOption(option, value);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
|
||||||
|
super.setConnectTimeoutMillis(connectTimeoutMillis);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
|
||||||
|
super.setMaxMessagesPerRead(maxMessagesPerRead);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollChannelConfig setWriteSpinCount(int writeSpinCount) {
|
||||||
|
super.setWriteSpinCount(writeSpinCount);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollChannelConfig setAllocator(ByteBufAllocator allocator) {
|
||||||
|
super.setAllocator(allocator);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
|
||||||
|
super.setRecvByteBufAllocator(allocator);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollChannelConfig setAutoRead(boolean autoRead) {
|
||||||
|
super.setAutoRead(autoRead);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
|
||||||
|
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
|
||||||
|
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
|
||||||
|
super.setMessageSizeEstimator(estimator);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the {@link EpollMode} used. Default is
|
||||||
|
* {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
|
||||||
|
* {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
|
||||||
|
* {@link EpollMode#LEVEL_TRIGGERED}.
|
||||||
|
*/
|
||||||
|
public EpollMode getEpollMode() {
|
||||||
|
return channel.isFlagSet(Native.EPOLLET)
|
||||||
|
? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the {@link EpollMode} used. Default is
|
||||||
|
* {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
|
||||||
|
* {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
|
||||||
|
* {@link EpollMode#LEVEL_TRIGGERED}.
|
||||||
|
*
|
||||||
|
* <strong>Be aware this config setting can only be adjusted before the channel was registered.</strong>
|
||||||
|
*/
|
||||||
|
public EpollChannelConfig setEpollMode(EpollMode mode) {
|
||||||
|
if (mode == null) {
|
||||||
|
throw new NullPointerException("mode");
|
||||||
|
}
|
||||||
|
switch (mode) {
|
||||||
|
case EDGE_TRIGGERED:
|
||||||
|
checkChannelNotRegistered();
|
||||||
|
channel.setFlag(Native.EPOLLET);
|
||||||
|
break;
|
||||||
|
case LEVEL_TRIGGERED:
|
||||||
|
checkChannelNotRegistered();
|
||||||
|
channel.clearFlag(Native.EPOLLET);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new Error();
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkChannelNotRegistered() {
|
||||||
|
if (channel.isRegistered()) {
|
||||||
|
throw new IllegalStateException("EpollMode can only be changed before channel is registered");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final void autoReadCleared() {
|
||||||
|
channel.clearEpollIn();
|
||||||
|
}
|
||||||
|
}
|
@ -27,7 +27,7 @@ public final class EpollChannelOption {
|
|||||||
public static final ChannelOption<Integer> TCP_KEEPCNT = ChannelOption.valueOf(T, "TCP_KEEPCNT");
|
public static final ChannelOption<Integer> TCP_KEEPCNT = ChannelOption.valueOf(T, "TCP_KEEPCNT");
|
||||||
public static final ChannelOption<DomainSocketReadMode> DOMAIN_SOCKET_READ_MODE =
|
public static final ChannelOption<DomainSocketReadMode> DOMAIN_SOCKET_READ_MODE =
|
||||||
ChannelOption.valueOf(T, "DOMAIN_SOCKET_READ_MODE");
|
ChannelOption.valueOf(T, "DOMAIN_SOCKET_READ_MODE");
|
||||||
|
public static final ChannelOption<EpollMode> EPOLL_MODE =
|
||||||
|
ChannelOption.valueOf(T, "EPOLL_MODE");
|
||||||
private EpollChannelOption() { }
|
private EpollChannelOption() { }
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,7 @@ import io.netty.channel.ChannelOutboundBuffer;
|
|||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.DefaultAddressedEnvelope;
|
import io.netty.channel.DefaultAddressedEnvelope;
|
||||||
|
import io.netty.channel.FileDescriptor;
|
||||||
import io.netty.channel.RecvByteBufAllocator;
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
import io.netty.channel.socket.DatagramChannel;
|
import io.netty.channel.socket.DatagramChannel;
|
||||||
import io.netty.channel.socket.DatagramChannelConfig;
|
import io.netty.channel.socket.DatagramChannelConfig;
|
||||||
@ -84,7 +85,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public boolean isActive() {
|
public boolean isActive() {
|
||||||
return fd() != EpollFileDescriptor.INVALID &&
|
return fd() != FileDescriptor.INVALID &&
|
||||||
(config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
|
(config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
|
||||||
|| active);
|
|| active);
|
||||||
}
|
}
|
||||||
@ -274,7 +275,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
Object msg = in.current();
|
Object msg = in.current();
|
||||||
if (msg == null) {
|
if (msg == null) {
|
||||||
// Wrote all messages.
|
// Wrote all messages.
|
||||||
clearEpollOut();
|
clearFlag(Native.EPOLLOUT);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -293,7 +294,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
|
int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
|
||||||
if (send == 0) {
|
if (send == 0) {
|
||||||
// Did not write all messages.
|
// Did not write all messages.
|
||||||
setEpollOut();
|
setFlag(Native.EPOLLOUT);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < send; i++) {
|
for (int i = 0; i < send; i++) {
|
||||||
@ -317,7 +318,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
in.remove();
|
in.remove();
|
||||||
} else {
|
} else {
|
||||||
// Did not write all messages.
|
// Did not write all messages.
|
||||||
setEpollOut();
|
setFlag(Native.EPOLLOUT);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -501,7 +502,12 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
final ChannelPipeline pipeline = pipeline();
|
final ChannelPipeline pipeline = pipeline();
|
||||||
Throwable exception = null;
|
Throwable exception = null;
|
||||||
try {
|
try {
|
||||||
for (;;) {
|
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
||||||
|
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
|
||||||
|
final int maxMessagesPerRead = edgeTriggered
|
||||||
|
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
|
||||||
|
int messages = 0;
|
||||||
|
do {
|
||||||
ByteBuf data = null;
|
ByteBuf data = null;
|
||||||
try {
|
try {
|
||||||
data = allocHandle.allocate(config.getAllocator());
|
data = allocHandle.allocate(config.getAllocator());
|
||||||
@ -536,8 +542,14 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
if (data != null) {
|
if (data != null) {
|
||||||
data.release();
|
data.release();
|
||||||
}
|
}
|
||||||
|
if (!edgeTriggered && !config().isAutoRead()) {
|
||||||
|
// This is not using EPOLLET so we can stop reading
|
||||||
|
// ASAP as we will get notified again later with
|
||||||
|
// pending data
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
} while (++ messages < maxMessagesPerRead);
|
||||||
|
|
||||||
int size = readBuf.size();
|
int size = readBuf.size();
|
||||||
for (int i = 0; i < size; i ++) {
|
for (int i = 0; i < size; i ++) {
|
||||||
|
@ -17,7 +17,6 @@ package io.netty.channel.epoll;
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
import io.netty.channel.DefaultChannelConfig;
|
|
||||||
import io.netty.channel.FixedRecvByteBufAllocator;
|
import io.netty.channel.FixedRecvByteBufAllocator;
|
||||||
import io.netty.channel.MessageSizeEstimator;
|
import io.netty.channel.MessageSizeEstimator;
|
||||||
import io.netty.channel.RecvByteBufAllocator;
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
@ -27,7 +26,7 @@ import java.net.InetAddress;
|
|||||||
import java.net.NetworkInterface;
|
import java.net.NetworkInterface;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public final class EpollDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig {
|
public final class EpollDatagramChannelConfig extends EpollChannelConfig implements DatagramChannelConfig {
|
||||||
private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048);
|
private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048);
|
||||||
private final EpollDatagramChannel datagramChannel;
|
private final EpollDatagramChannel datagramChannel;
|
||||||
private boolean activeOnOpen;
|
private boolean activeOnOpen;
|
||||||
@ -285,6 +284,12 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
|
|||||||
throw new UnsupportedOperationException("Multicast not supported");
|
throw new UnsupportedOperationException("Multicast not supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollDatagramChannelConfig setEpollMode(EpollMode mode) {
|
||||||
|
super.setEpollMode(mode);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns {@code true} if the SO_REUSEPORT option is set.
|
* Returns {@code true} if the SO_REUSEPORT option is set.
|
||||||
*/
|
*/
|
||||||
@ -303,9 +308,4 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
|
|||||||
Native.setReusePort(datagramChannel.fd().intValue(), reusePort ? 1 : 0);
|
Native.setReusePort(datagramChannel.fd().intValue(), reusePort ? 1 : 0);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void autoReadCleared() {
|
|
||||||
datagramChannel.clearEpollIn();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -126,7 +126,12 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
|
|||||||
final ChannelPipeline pipeline = pipeline();
|
final ChannelPipeline pipeline = pipeline();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (;;) {
|
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
||||||
|
// if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
|
||||||
|
final int maxMessagesPerRead = edgeTriggered
|
||||||
|
? Integer.MAX_VALUE : config().getMaxMessagesPerRead();
|
||||||
|
int messages = 0;
|
||||||
|
do {
|
||||||
int socketFd = Native.recvFd(fd().intValue());
|
int socketFd = Native.recvFd(fd().intValue());
|
||||||
if (socketFd == 0) {
|
if (socketFd == 0) {
|
||||||
break;
|
break;
|
||||||
@ -136,8 +141,23 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
readPending = false;
|
readPending = false;
|
||||||
pipeline.fireChannelRead(new EpollFileDescriptor(socketFd));
|
|
||||||
}
|
try {
|
||||||
|
pipeline.fireChannelRead(new EpollFileDescriptor(socketFd));
|
||||||
|
} catch (Throwable t) {
|
||||||
|
// keep on reading as we use epoll ET and need to consume everything from the socket
|
||||||
|
pipeline.fireChannelReadComplete();
|
||||||
|
pipeline.fireExceptionCaught(t);
|
||||||
|
} finally {
|
||||||
|
if (!edgeTriggered && !config().isAutoRead()) {
|
||||||
|
// This is not using EPOLLET so we can stop reading
|
||||||
|
// ASAP as we will get notified again later with
|
||||||
|
// pending data
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (++ messages < maxMessagesPerRead);
|
||||||
|
|
||||||
pipeline.fireChannelReadComplete();
|
pipeline.fireChannelReadComplete();
|
||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
@ -18,18 +18,17 @@ package io.netty.channel.epoll;
|
|||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
import io.netty.channel.DefaultChannelConfig;
|
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.MessageSizeEstimator;
|
import io.netty.channel.MessageSizeEstimator;
|
||||||
import io.netty.channel.RecvByteBufAllocator;
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public final class EpollDomainSocketChannelConfig extends DefaultChannelConfig {
|
public final class EpollDomainSocketChannelConfig extends EpollChannelConfig {
|
||||||
private volatile DomainSocketReadMode mode =
|
private volatile DomainSocketReadMode mode =
|
||||||
DomainSocketReadMode.BYTES;
|
DomainSocketReadMode.BYTES;
|
||||||
|
|
||||||
EpollDomainSocketChannelConfig(Channel channel) {
|
EpollDomainSocketChannelConfig(AbstractEpollChannel channel) {
|
||||||
super(channel);
|
super(channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,6 +118,12 @@ public final class EpollDomainSocketChannelConfig extends DefaultChannelConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EpollDomainSocketChannelConfig setEpollMode(EpollMode mode) {
|
||||||
|
super.setEpollMode(mode);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Change the {@link DomainSocketReadMode} for the channel. The default is
|
* Change the {@link DomainSocketReadMode} for the channel. The default is
|
||||||
* {@link DomainSocketReadMode#BYTES} which means bytes will be read from the
|
* {@link DomainSocketReadMode#BYTES} which means bytes will be read from the
|
||||||
|
@ -321,22 +321,18 @@ final class EpollEventLoop extends SingleThreadEventLoop {
|
|||||||
// consume wakeup event
|
// consume wakeup event
|
||||||
Native.eventFdRead(eventFd);
|
Native.eventFdRead(eventFd);
|
||||||
} else {
|
} else {
|
||||||
boolean read = (ev & Native.EPOLLIN) != 0;
|
|
||||||
boolean write = (ev & Native.EPOLLOUT) != 0;
|
|
||||||
boolean close = (ev & Native.EPOLLRDHUP) != 0;
|
|
||||||
|
|
||||||
AbstractEpollChannel ch = ids.get(id);
|
AbstractEpollChannel ch = ids.get(id);
|
||||||
if (ch != null) {
|
if (ch != null) {
|
||||||
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
|
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
|
||||||
if (write && ch.isOpen()) {
|
if ((ev & Native.EPOLLOUT) != 0 && ch.isOpen()) {
|
||||||
// force flush of data as the epoll is writable again
|
// force flush of data as the epoll is writable again
|
||||||
unsafe.epollOutReady();
|
unsafe.epollOutReady();
|
||||||
}
|
}
|
||||||
if (read && ch.isOpen()) {
|
if ((ev & Native.EPOLLIN) != 0 && ch.isOpen()) {
|
||||||
// Something is ready to read, so consume it now
|
// Something is ready to read, so consume it now
|
||||||
unsafe.epollInReady();
|
unsafe.epollInReady();
|
||||||
}
|
}
|
||||||
if (close && ch.isOpen()) {
|
if ((ev & Native.EPOLLRDHUP) != 0 && ch.isOpen()) {
|
||||||
unsafe.epollRdHupReady();
|
unsafe.epollRdHupReady();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,36 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2015 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel.epoll;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The <a href="http://linux.die.net/man/7/epoll">epoll</a> mode to use.
|
||||||
|
*/
|
||||||
|
public enum EpollMode {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use {@code EPOLLET} (edge-triggered).
|
||||||
|
*
|
||||||
|
* @see <a href="http://linux.die.net/man/7/epoll">man 7 epoll</a>.
|
||||||
|
*/
|
||||||
|
EDGE_TRIGGERED,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do not use {@code EPOLLET} (level-triggered).
|
||||||
|
*
|
||||||
|
* @see <a href="http://linux.die.net/man/7/epoll">man 7 epoll</a>.
|
||||||
|
*/
|
||||||
|
LEVEL_TRIGGERED
|
||||||
|
}
|
@ -17,7 +17,6 @@ package io.netty.channel.epoll;
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
import io.netty.channel.DefaultChannelConfig;
|
|
||||||
import io.netty.channel.MessageSizeEstimator;
|
import io.netty.channel.MessageSizeEstimator;
|
||||||
import io.netty.channel.RecvByteBufAllocator;
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
import io.netty.util.NetUtil;
|
import io.netty.util.NetUtil;
|
||||||
@ -28,7 +27,7 @@ import static io.netty.channel.ChannelOption.SO_BACKLOG;
|
|||||||
import static io.netty.channel.ChannelOption.SO_RCVBUF;
|
import static io.netty.channel.ChannelOption.SO_RCVBUF;
|
||||||
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
|
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
|
||||||
|
|
||||||
public class EpollServerChannelConfig extends DefaultChannelConfig {
|
public class EpollServerChannelConfig extends EpollChannelConfig {
|
||||||
protected final AbstractEpollChannel channel;
|
protected final AbstractEpollChannel channel;
|
||||||
private volatile int backlog = NetUtil.SOMAXCONN;
|
private volatile int backlog = NetUtil.SOMAXCONN;
|
||||||
|
|
||||||
@ -159,7 +158,8 @@ public class EpollServerChannelConfig extends DefaultChannelConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected final void autoReadCleared() {
|
public EpollServerChannelConfig setEpollMode(EpollMode mode) {
|
||||||
channel.clearEpollIn();
|
super.setEpollMode(mode);
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,6 @@ package io.netty.channel.epoll;
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
import io.netty.channel.DefaultChannelConfig;
|
|
||||||
import io.netty.channel.MessageSizeEstimator;
|
import io.netty.channel.MessageSizeEstimator;
|
||||||
import io.netty.channel.RecvByteBufAllocator;
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
import io.netty.channel.socket.SocketChannelConfig;
|
import io.netty.channel.socket.SocketChannelConfig;
|
||||||
@ -27,7 +26,7 @@ import java.util.Map;
|
|||||||
|
|
||||||
import static io.netty.channel.ChannelOption.*;
|
import static io.netty.channel.ChannelOption.*;
|
||||||
|
|
||||||
public final class EpollSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {
|
public final class EpollSocketChannelConfig extends EpollChannelConfig implements SocketChannelConfig {
|
||||||
|
|
||||||
private final EpollSocketChannel channel;
|
private final EpollSocketChannel channel;
|
||||||
private volatile boolean allowHalfClosure;
|
private volatile boolean allowHalfClosure;
|
||||||
@ -345,7 +344,8 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig impleme
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void autoReadCleared() {
|
public EpollSocketChannelConfig setEpollMode(EpollMode mode) {
|
||||||
channel.clearEpollIn();
|
super.setEpollMode(mode);
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -51,8 +51,9 @@ final class Native {
|
|||||||
// EventLoop operations and constants
|
// EventLoop operations and constants
|
||||||
public static final int EPOLLIN = 0x01;
|
public static final int EPOLLIN = 0x01;
|
||||||
public static final int EPOLLOUT = 0x02;
|
public static final int EPOLLOUT = 0x02;
|
||||||
public static final int EPOLLACCEPT = 0x04;
|
public static final int EPOLLRDHUP = 0x04;
|
||||||
public static final int EPOLLRDHUP = 0x08;
|
public static final int EPOLLET = 0x08;
|
||||||
|
|
||||||
public static final int IOV_MAX = iovMax();
|
public static final int IOV_MAX = iovMax();
|
||||||
public static final int UIO_MAX_IOV = uioMaxIov();
|
public static final int UIO_MAX_IOV = uioMaxIov();
|
||||||
public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();
|
public static final boolean IS_SUPPORTING_SENDMMSG = isSupportingSendmmsg();
|
||||||
|
Loading…
Reference in New Issue
Block a user