diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java
index f01e39e3e3..4fd3ba07b4 100644
--- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java
+++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
@@ -135,11 +136,18 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
}
@Override
- protected void doBeginRead() throws Exception {
+ protected final void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
- ((AbstractEpollUnsafe) unsafe()).readPending = true;
+ AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
+ unsafe.readPending = true;
setFlag(readFlag);
+
+ // If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
+ // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
+ if (unsafe.maybeMoreDataToRead) {
+ unsafe.epollInReady();
+ }
}
final void clearEpollIn() {
@@ -299,6 +307,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
protected boolean readPending;
+ protected boolean maybeMoreDataToRead;
private EpollRecvByteAllocatorHandle allocHandle;
/**
@@ -306,6 +315,22 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
*/
abstract void epollInReady();
+ final void epollInReadAttempted() {
+ readPending = maybeMoreDataToRead = false;
+ }
+
+ final void epollInFinally(ChannelConfig config) {
+ // Check if there is a readPending which was not processed yet.
+ // This could be for two reasons:
+ // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
+ // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
+ //
+ // See https://github.com/netty/netty/issues/2254
+ if (!readPending && !config.isAutoRead()) {
+ clearEpollIn();
+ }
+ }
+
/**
* Will schedule a {@link #epollInReady()} call on the event loop if necessary.
* @param edgeTriggered {@code true} if the channel is using ET mode. {@code false} otherwise.
@@ -390,7 +415,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
* Create a new {@EpollRecvByteAllocatorHandle} instance.
* @param handle The handle to wrap with EPOLL specific logic.
*/
- protected abstract EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle);
+ EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
+ return new EpollRecvByteAllocatorHandle(handle, config());
+ }
@Override
protected void flush0() {
diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java
index ed41f3a526..2a67ad3938 100644
--- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java
+++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java
@@ -22,7 +22,6 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
-import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
@@ -104,65 +103,55 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
channelPromise.setFailure(new UnsupportedOperationException());
}
- @Override
- protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
- return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET));
- }
-
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
if (fd().isInputShutdown()) {
return;
}
- boolean edgeTriggered = isFlagSet(Native.EPOLLET);
-
+ final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
+ allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
final ChannelConfig config = config();
- if (!readPending && !edgeTriggered && !config.isAutoRead()) {
+ if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
}
final ChannelPipeline pipeline = pipeline();
- final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
Throwable exception = null;
try {
try {
do {
- int socketFd = fd().accept(acceptedAddress);
- if (socketFd == -1) {
+ // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
+ // EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
+ // enabled.
+ allocHandle.lastBytesRead(fd().accept(acceptedAddress));
+ epollInReadAttempted();
+ if (allocHandle.lastBytesRead() == -1) {
// this means everything was handled for now
break;
}
- readPending = false;
allocHandle.incMessagesRead(1);
int len = acceptedAddress[0];
- pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len));
+ pipeline.fireChannelRead(newChildChannel(allocHandle.lastBytesRead(), acceptedAddress, 1, len));
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
allocHandle.readComplete();
+ maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
- checkResetEpollIn(edgeTriggered);
+ checkResetEpollIn(allocHandle.isEdgeTriggered());
}
} finally {
- // Check if there is a readPending which was not processed yet.
- // This could be for two reasons:
- // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
- // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
- //
- // See https://github.com/netty/netty/issues/2254
- if (!readPending && !config.isAutoRead()) {
- clearEpollIn0();
- }
+ epollInFinally(config);
}
}
}
diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java
index 78aa2ef800..dc46213347 100644
--- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java
+++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java
@@ -660,7 +660,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
return super.prepareToClose();
}
- private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
+ private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
+ EpollRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
@@ -669,7 +670,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
byteBuf.release();
}
}
- recvBufAllocHandle().readComplete();
+ allocHandle.readComplete();
+ maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
@@ -816,8 +818,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
}
@Override
- protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
- return new EpollRecvByteAllocatorStreamingHandle(handle, isFlagSet(Native.EPOLLET));
+ EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
+ return new EpollRecvByteAllocatorStreamingHandle(handle, config());
}
@Override
@@ -826,9 +828,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
return;
}
final ChannelConfig config = config();
- boolean edgeTriggered = isFlagSet(Native.EPOLLET);
+ final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
+ allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
- if (!readPending && !edgeTriggered && !config.isAutoRead()) {
+ if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
@@ -836,7 +839,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
- final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
@@ -863,6 +865,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
+ epollInReadAttempted();
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
@@ -870,13 +873,13 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
close = allocHandle.lastBytesRead() < 0;
break;
}
- readPending = false;
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
+ maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
if (close) {
@@ -884,18 +887,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
close = false;
}
} catch (Throwable t) {
- handleReadException(pipeline, byteBuf, t, close);
- checkResetEpollIn(edgeTriggered);
+ handleReadException(pipeline, byteBuf, t, close, allocHandle);
+ checkResetEpollIn(allocHandle.isEdgeTriggered());
} finally {
- // Check if there is a readPending which was not processed yet.
- // This could be for two reasons:
- // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
- // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
- //
- // See https://github.com/netty/netty/issues/2254
- if (!readPending && !config.isAutoRead()) {
- clearEpollIn0();
- }
+ epollInFinally(config);
}
}
}
diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java
index ff3beb8dc0..e540328b77 100644
--- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java
+++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java
@@ -26,7 +26,6 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope;
-import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
@@ -519,11 +518,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
}
- @Override
- protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
- return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET));
- }
-
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
@@ -531,9 +525,10 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return;
}
DatagramChannelConfig config = config();
- boolean edgeTriggered = isFlagSet(Native.EPOLLET);
+ final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
+ allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
- if (!readPending && !edgeTriggered && !config.isAutoRead()) {
+ if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
@@ -541,7 +536,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
- final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
Throwable exception = null;
@@ -561,7 +555,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
remoteAddress = fd().recvFrom(nioData, nioData.position(), nioData.limit());
}
+ epollInReadAttempted();
if (remoteAddress == null) {
+ allocHandle.lastBytesRead(-1);
data.release();
data = null;
break;
@@ -570,7 +566,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
allocHandle.incMessagesRead(1);
allocHandle.lastBytesRead(remoteAddress.receivedAmount());
data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
- readPending = false;
readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
data = null;
@@ -589,22 +584,15 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
readBuf.clear();
allocHandle.readComplete();
+ maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
- checkResetEpollIn(edgeTriggered);
+ checkResetEpollIn(allocHandle.isEdgeTriggered());
}
} finally {
- // Check if there is a readPending which was not processed yet.
- // This could be for two reasons:
- // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
- // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
- //
- // See https://github.com/netty/netty/issues/2254
- if (!readPending && !config.isAutoRead()) {
- clearEpollIn();
- }
+ epollInFinally(config);
}
}
}
diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java
index 489a21c9c5..e68486f5fc 100644
--- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java
+++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java
@@ -23,7 +23,6 @@ import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
-import io.netty.util.internal.OneTimeTask;
import java.net.SocketAddress;
@@ -152,51 +151,49 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
if (fd().isInputShutdown()) {
return;
}
- boolean edgeTriggered = isFlagSet(Native.EPOLLET);
+ final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
+ allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
final ChannelConfig config = config();
- if (!readPending && !edgeTriggered && !config.isAutoRead()) {
+ if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
}
final ChannelPipeline pipeline = pipeline();
- final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
try {
- do {
- int socketFd = Native.recvFd(fd().intValue());
- if (socketFd == 0) {
- break;
- }
- if (socketFd == -1) {
+ readLoop: do {
+ // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
+ // EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
+ // enabled.
+ allocHandle.lastBytesRead(Native.recvFd(fd().intValue()));
+ epollInReadAttempted();
+ switch(allocHandle.lastBytesRead()) {
+ case 0:
+ break readLoop;
+ case -1:
close(voidPromise());
return;
+ default:
+ allocHandle.incMessagesRead(1);
+ pipeline.fireChannelRead(new FileDescriptor(allocHandle.lastBytesRead()));
+ break;
}
-
- readPending = false;
- allocHandle.incMessagesRead(1);
- pipeline.fireChannelRead(new FileDescriptor(socketFd));
} while (allocHandle.continueReading());
allocHandle.readComplete();
+ maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
} catch (Throwable t) {
allocHandle.readComplete();
+ maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
- checkResetEpollIn(edgeTriggered);
+ checkResetEpollIn(allocHandle.isEdgeTriggered());
} finally {
- // Check if there is a readPending which was not processed yet.
- // This could be for two reasons:
- // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
- // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
- //
- // See https://github.com/netty/netty/issues/2254
- if (!readPending && !config.isAutoRead()) {
- clearEpollIn0();
- }
+ epollInFinally(config);
}
}
}
diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java
index f32b89c6e4..d11aaf108a 100644
--- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java
+++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java
@@ -15,26 +15,47 @@
*/
package io.netty.channel.epoll;
+import io.netty.channel.ChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
-abstract class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle {
- private final boolean isEdgeTriggered;
+class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle {
+ private boolean isEdgeTriggered;
+ private final ChannelConfig config;
private boolean receivedRdHup;
- public EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) {
+ EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, ChannelConfig config) {
super(handle);
- this.isEdgeTriggered = isEdgeTriggered;
+ this.config = config;
}
- public final boolean isEdgeTriggered() {
- return isEdgeTriggered;
- }
-
- public final void receivedRdHup() {
+ final void receivedRdHup() {
receivedRdHup = true;
}
- public final boolean isRdHup() {
- return receivedRdHup;
+ boolean maybeMoreDataToRead() {
+ return isEdgeTriggered && lastBytesRead() > 0;
+ }
+
+ final void edgeTriggered(boolean edgeTriggered) {
+ isEdgeTriggered = edgeTriggered;
+ }
+
+ final boolean isEdgeTriggered() {
+ return isEdgeTriggered;
+ }
+
+ @Override
+ public final boolean continueReading() {
+ /**
+ * EPOLL ET requires that we read until we get an EAGAIN
+ * (see Q9 in epoll man). However in order to
+ * respect auto read we supporting reading to stop if auto read is off. If auto read is on we force reading to
+ * continue to avoid a {@link java.lang.StackOverflowError} between channelReadComplete and reading from the
+ * channel. It is expected that the {@link #EpollSocketChannel} implementations will track if we are in
+ * edgeTriggered mode and all data was not read, and will force a EPOLLIN ready event.
+ *
+ * If EPOLLRDHUP has been received we must read until we get a read error.
+ */
+ return receivedRdHup || maybeMoreDataToRead() && config.isAutoRead() || super.continueReading();
}
}
diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorMessageHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorMessageHandle.java
deleted file mode 100644
index bb8a59afa9..0000000000
--- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorMessageHandle.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2015 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package io.netty.channel.epoll;
-
-import io.netty.channel.RecvByteBufAllocator;
-
-/**
- * Respects termination conditions for EPOLL message (aka packet) based protocols.
- */
-final class EpollRecvByteAllocatorMessageHandle extends EpollRecvByteAllocatorHandle {
- public EpollRecvByteAllocatorMessageHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) {
- super(handle, isEdgeTriggered);
- }
-
- @Override
- public boolean continueReading() {
- /**
- * If edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise. For
- * packet oriented descriptors must read until we get a EAGAIN
- * (see Q9 in epoll man).
- *
- * If EPOLLRDHUP has been received we must read until we get a read error.
- */
- return isEdgeTriggered() || isRdHup() ? true : super.continueReading();
- }
-}
diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java
index b17dd75a8a..9746e74248 100644
--- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java
+++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java
@@ -15,27 +15,20 @@
*/
package io.netty.channel.epoll;
+import io.netty.channel.ChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
-/**
- * EPOLL must read until no more data is available while in edge triggered mode. This class will always continue reading
- * unless the last read did not fill up the available buffer space.
- */
final class EpollRecvByteAllocatorStreamingHandle extends EpollRecvByteAllocatorHandle {
- public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) {
- super(handle, isEdgeTriggered);
+ public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.Handle handle, ChannelConfig config) {
+ super(handle, config);
}
@Override
- public boolean continueReading() {
+ boolean maybeMoreDataToRead() {
/**
- * if edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise.
* For stream oriented descriptors we can assume we are done reading if the last read attempt didn't produce
* a full buffer (see Q9 in epoll man).
- *
- * If EPOLLRDHUP has been received we must read until we get a read error.
*/
- return isRdHup() ? true :
- isEdgeTriggered() ? lastBytesRead() == attemptedBytesRead() : super.continueReading();
+ return isEdgeTriggered() && lastBytesRead() == attemptedBytesRead();
}
}
diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java
index 109f388f07..56fa8acafc 100644
--- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java
+++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java
@@ -17,14 +17,18 @@ package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;
import io.netty.util.ReferenceCountUtil;
import org.junit.Assert;
@@ -34,8 +38,10 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -128,6 +134,76 @@ public class EpollSocketChannelTest {
}
}
+ @Test
+ public void testAutoReadOffDuringReadOnlyReadsOneTime() throws InterruptedException {
+ EventLoopGroup group = new EpollEventLoopGroup();
+ try {
+ runAutoReadTest(group, EpollServerSocketChannel.class, EpollSocketChannel.class,
+ new InetSocketAddress(0));
+ runAutoReadTest(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class,
+ EpollSocketTestPermutation.newSocketAddress());
+ } finally {
+ group.shutdownGracefully();
+ }
+ }
+
+ private void runAutoReadTest(EventLoopGroup group, Class extends ServerChannel> serverChannelClass,
+ Class extends Channel> channelClass, SocketAddress bindAddr) throws InterruptedException {
+ Channel serverChannel = null;
+ Channel clientChannel = null;
+ try {
+ AutoReadInitializer serverInitializer = new AutoReadInitializer();
+ AutoReadInitializer clientInitializer = new AutoReadInitializer();
+ ServerBootstrap sb = new ServerBootstrap();
+ sb.option(ChannelOption.SO_BACKLOG, 1024)
+ .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
+ .option(ChannelOption.AUTO_READ, true)
+ .group(group)
+ .channel(serverChannelClass)
+ .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
+ .childOption(ChannelOption.AUTO_READ, true)
+ // We want to ensure that we attempt multiple individual read operations per read loop so we can
+ // test the auto read feature being turned off when data is first read.
+ .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
+ .childHandler(serverInitializer);
+
+ serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel();
+
+ Bootstrap b = new Bootstrap()
+ .group(group)
+ .channel(channelClass)
+ .remoteAddress(serverChannel.localAddress())
+ .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
+ .option(ChannelOption.AUTO_READ, true)
+ // We want to ensure that we attempt multiple individual read operations per read loop so we can
+ // test the auto read feature being turned off when data is first read.
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
+ .handler(clientInitializer);
+ clientChannel = b.connect().syncUninterruptibly().channel();
+
+ // 3 bytes means 3 independent reads for TestRecvByteBufAllocator
+ clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
+ serverInitializer.autoReadHandler.assertSingleRead();
+
+ // 3 bytes means 3 independent reads for TestRecvByteBufAllocator
+ serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
+ clientInitializer.autoReadHandler.assertSingleRead();
+
+ serverInitializer.channel.read();
+ serverInitializer.autoReadHandler.assertSingleReadSecondTry();
+
+ clientChannel.read();
+ clientInitializer.autoReadHandler.assertSingleReadSecondTry();
+ } finally {
+ if (serverChannel != null) {
+ serverChannel.close().syncUninterruptibly();
+ }
+ if (clientChannel != null) {
+ clientChannel.close().syncUninterruptibly();
+ }
+ }
+ }
+
private void runExceptionHandleFeedbackLoop(EventLoopGroup group, Class extends ServerChannel> serverChannelClass,
Class extends Channel> channelClass, SocketAddress bindAddr) throws InterruptedException {
Channel serverChannel = null;
@@ -168,6 +244,77 @@ public class EpollSocketChannelTest {
}
}
+ /**
+ * Designed to keep reading as long as autoread is enabled.
+ */
+ private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator {
+ @Override
+ public Handle newHandle() {
+ return new Handle() {
+ private ChannelConfig config;
+ private int attemptedBytesRead;
+ private int lastBytesRead;
+ @Override
+ public ByteBuf allocate(ByteBufAllocator alloc) {
+ return alloc.ioBuffer(guess());
+ }
+
+ @Override
+ public int guess() {
+ return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
+ }
+
+ @Override
+ public void reset(ChannelConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void incMessagesRead(int numMessages) {
+ }
+
+ @Override
+ public void lastBytesRead(int bytes) {
+ lastBytesRead = bytes;
+ }
+
+ @Override
+ public int lastBytesRead() {
+ return lastBytesRead;
+ }
+
+ @Override
+ public void attemptedBytesRead(int bytes) {
+ attemptedBytesRead = bytes;
+ }
+
+ @Override
+ public int attemptedBytesRead() {
+ return attemptedBytesRead;
+ }
+
+ @Override
+ public boolean continueReading() {
+ return config.isAutoRead();
+ }
+
+ @Override
+ public void readComplete() {
+ }
+ };
+ }
+ }
+
+ private static class AutoReadInitializer extends ChannelInitializer {
+ final AutoReadHandler autoReadHandler = new AutoReadHandler();
+ volatile Channel channel;
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ channel = ch;
+ ch.pipeline().addLast(autoReadHandler);
+ }
+ }
+
private static class MyInitializer extends ChannelInitializer {
final ExceptionHandler exceptionHandler = new ExceptionHandler();
@Override
@@ -187,6 +334,36 @@ public class EpollSocketChannelTest {
}
}
+ private static final class AutoReadHandler extends ChannelInboundHandlerAdapter {
+ private final AtomicInteger count = new AtomicInteger();
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final CountDownLatch latch2 = new CountDownLatch(2);
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ ReferenceCountUtil.release(msg);
+ if (count.incrementAndGet() == 1) {
+ ctx.channel().config().setAutoRead(false);
+ }
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ latch.countDown();
+ latch2.countDown();
+ }
+
+ void assertSingleRead() throws InterruptedException {
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertEquals(1, count.get());
+ }
+
+ void assertSingleReadSecondTry() throws InterruptedException {
+ assertTrue(latch2.await(5, TimeUnit.SECONDS));
+ assertEquals(2, count.get());
+ }
+ }
+
private static class ExceptionHandler extends ChannelInboundHandlerAdapter {
final AtomicLong count = new AtomicLong();
/**