From bfbef036a8c1121083b485a98b9cb04a84e7dfea Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Sat, 27 Feb 2016 17:56:41 -0800 Subject: [PATCH] EPOLL ET AutoRead Motivation: EPOLL does not support autoread when in ET mode. Modifications: - EpollRecvByteAllocatorHandle should not unconditionally force reading just because ET is enabled - AbstractEpollChannel and all derived classes which implement epollInReady must support a variable which indicates there may be more data to read. The variable will be used when read is called to simulate a EPOLL wakeup and call epollInReady if necessary. This will ensure that if we don't read until EAGAIN that we will try to read again and not rely on EPOLL to notify us. Result: EPOLL ET supports auto read. --- .../channel/epoll/AbstractEpollChannel.java | 33 +++- .../epoll/AbstractEpollServerChannel.java | 37 ++-- .../epoll/AbstractEpollStreamChannel.java | 33 ++-- .../channel/epoll/EpollDatagramChannel.java | 28 +-- .../epoll/EpollDomainSocketChannel.java | 45 +++-- .../epoll/EpollRecvByteAllocatorHandle.java | 43 +++-- .../EpollRecvByteAllocatorMessageHandle.java | 39 ---- ...EpollRecvByteAllocatorStreamingHandle.java | 17 +- .../channel/epoll/EpollSocketChannelTest.java | 177 ++++++++++++++++++ 9 files changed, 300 insertions(+), 152 deletions(-) delete mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorMessageHandle.java diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index f01e39e3e3..4fd3ba07b4 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; @@ -135,11 +136,18 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } @Override - protected void doBeginRead() throws Exception { + protected final void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called - ((AbstractEpollUnsafe) unsafe()).readPending = true; + AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); + unsafe.readPending = true; setFlag(readFlag); + + // If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified + // again if we didn't consume all the data. So we force a read operation here if there maybe more data. + if (unsafe.maybeMoreDataToRead) { + unsafe.epollInReady(); + } } final void clearEpollIn() { @@ -299,6 +307,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { protected boolean readPending; + protected boolean maybeMoreDataToRead; private EpollRecvByteAllocatorHandle allocHandle; /** @@ -306,6 +315,22 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann */ abstract void epollInReady(); + final void epollInReadAttempted() { + readPending = maybeMoreDataToRead = false; + } + + final void epollInFinally(ChannelConfig config) { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (!readPending && !config.isAutoRead()) { + clearEpollIn(); + } + } + /** * Will schedule a {@link #epollInReady()} call on the event loop if necessary. * @param edgeTriggered {@code true} if the channel is using ET mode. {@code false} otherwise. @@ -390,7 +415,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann * Create a new {@EpollRecvByteAllocatorHandle} instance. * @param handle The handle to wrap with EPOLL specific logic. */ - protected abstract EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle); + EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { + return new EpollRecvByteAllocatorHandle(handle, config()); + } @Override protected void flush0() { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index ed41f3a526..2a67ad3938 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; -import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.ServerChannel; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; @@ -104,65 +103,55 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im channelPromise.setFailure(new UnsupportedOperationException()); } - @Override - protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { - return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET)); - } - @Override void epollInReady() { assert eventLoop().inEventLoop(); if (fd().isInputShutdown()) { return; } - boolean edgeTriggered = isFlagSet(Native.EPOLLET); - + final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); final ChannelConfig config = config(); - if (!readPending && !edgeTriggered && !config.isAutoRead()) { + if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) { // ChannelConfig.setAutoRead(false) was called in the meantime clearEpollIn0(); return; } final ChannelPipeline pipeline = pipeline(); - final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); Throwable exception = null; try { try { do { - int socketFd = fd().accept(acceptedAddress); - if (socketFd == -1) { + // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the + // EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is + // enabled. + allocHandle.lastBytesRead(fd().accept(acceptedAddress)); + epollInReadAttempted(); + if (allocHandle.lastBytesRead() == -1) { // this means everything was handled for now break; } - readPending = false; allocHandle.incMessagesRead(1); int len = acceptedAddress[0]; - pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len)); + pipeline.fireChannelRead(newChildChannel(allocHandle.lastBytesRead(), acceptedAddress, 1, len)); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } allocHandle.readComplete(); + maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); pipeline.fireChannelReadComplete(); if (exception != null) { pipeline.fireExceptionCaught(exception); - checkResetEpollIn(edgeTriggered); + checkResetEpollIn(allocHandle.isEdgeTriggered()); } } finally { - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!readPending && !config.isAutoRead()) { - clearEpollIn0(); - } + epollInFinally(config); } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 78aa2ef800..dc46213347 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -660,7 +660,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im return super.prepareToClose(); } - private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { + private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close, + EpollRecvByteAllocatorHandle allocHandle) { if (byteBuf != null) { if (byteBuf.isReadable()) { readPending = false; @@ -669,7 +670,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im byteBuf.release(); } } - recvBufAllocHandle().readComplete(); + allocHandle.readComplete(); + maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { @@ -816,8 +818,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } @Override - protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { - return new EpollRecvByteAllocatorStreamingHandle(handle, isFlagSet(Native.EPOLLET)); + EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { + return new EpollRecvByteAllocatorStreamingHandle(handle, config()); } @Override @@ -826,9 +828,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im return; } final ChannelConfig config = config(); - boolean edgeTriggered = isFlagSet(Native.EPOLLET); + final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); - if (!readPending && !edgeTriggered && !config.isAutoRead()) { + if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) { // ChannelConfig.setAutoRead(false) was called in the meantime clearEpollIn0(); return; @@ -836,7 +839,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); - final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; @@ -863,6 +865,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im // to handle direct buffers. byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); + epollInReadAttempted(); if (allocHandle.lastBytesRead() <= 0) { // nothing was read, release the buffer. byteBuf.release(); @@ -870,13 +873,13 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im close = allocHandle.lastBytesRead() < 0; break; } - readPending = false; allocHandle.incMessagesRead(1); pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); + maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); pipeline.fireChannelReadComplete(); if (close) { @@ -884,18 +887,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im close = false; } } catch (Throwable t) { - handleReadException(pipeline, byteBuf, t, close); - checkResetEpollIn(edgeTriggered); + handleReadException(pipeline, byteBuf, t, close, allocHandle); + checkResetEpollIn(allocHandle.isEdgeTriggered()); } finally { - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!readPending && !config.isAutoRead()) { - clearEpollIn0(); - } + epollInFinally(config); } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index ff3beb8dc0..e540328b77 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -26,7 +26,6 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultAddressedEnvelope; -import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannelConfig; import io.netty.channel.socket.DatagramPacket; @@ -519,11 +518,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } } - @Override - protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { - return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET)); - } - @Override void epollInReady() { assert eventLoop().inEventLoop(); @@ -531,9 +525,10 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements return; } DatagramChannelConfig config = config(); - boolean edgeTriggered = isFlagSet(Native.EPOLLET); + final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); - if (!readPending && !edgeTriggered && !config.isAutoRead()) { + if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) { // ChannelConfig.setAutoRead(false) was called in the meantime clearEpollIn0(); return; @@ -541,7 +536,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); - final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); Throwable exception = null; @@ -561,7 +555,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements remoteAddress = fd().recvFrom(nioData, nioData.position(), nioData.limit()); } + epollInReadAttempted(); if (remoteAddress == null) { + allocHandle.lastBytesRead(-1); data.release(); data = null; break; @@ -570,7 +566,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements allocHandle.incMessagesRead(1); allocHandle.lastBytesRead(remoteAddress.receivedAmount()); data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead()); - readPending = false; readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress)); data = null; @@ -589,22 +584,15 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } readBuf.clear(); allocHandle.readComplete(); + maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); pipeline.fireChannelReadComplete(); if (exception != null) { pipeline.fireExceptionCaught(exception); - checkResetEpollIn(edgeTriggered); + checkResetEpollIn(allocHandle.isEdgeTriggered()); } } finally { - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!readPending && !config.isAutoRead()) { - clearEpollIn(); - } + epollInFinally(config); } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index 489a21c9c5..e68486f5fc 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -23,7 +23,6 @@ import io.netty.channel.unix.DomainSocketAddress; import io.netty.channel.unix.DomainSocketChannel; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; -import io.netty.util.internal.OneTimeTask; import java.net.SocketAddress; @@ -152,51 +151,49 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i if (fd().isInputShutdown()) { return; } - boolean edgeTriggered = isFlagSet(Native.EPOLLET); + final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); final ChannelConfig config = config(); - if (!readPending && !edgeTriggered && !config.isAutoRead()) { + if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) { // ChannelConfig.setAutoRead(false) was called in the meantime clearEpollIn0(); return; } final ChannelPipeline pipeline = pipeline(); - final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); try { - do { - int socketFd = Native.recvFd(fd().intValue()); - if (socketFd == 0) { - break; - } - if (socketFd == -1) { + readLoop: do { + // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the + // EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is + // enabled. + allocHandle.lastBytesRead(Native.recvFd(fd().intValue())); + epollInReadAttempted(); + switch(allocHandle.lastBytesRead()) { + case 0: + break readLoop; + case -1: close(voidPromise()); return; + default: + allocHandle.incMessagesRead(1); + pipeline.fireChannelRead(new FileDescriptor(allocHandle.lastBytesRead())); + break; } - - readPending = false; - allocHandle.incMessagesRead(1); - pipeline.fireChannelRead(new FileDescriptor(socketFd)); } while (allocHandle.continueReading()); allocHandle.readComplete(); + maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); pipeline.fireChannelReadComplete(); } catch (Throwable t) { allocHandle.readComplete(); + maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(t); - checkResetEpollIn(edgeTriggered); + checkResetEpollIn(allocHandle.isEdgeTriggered()); } finally { - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!readPending && !config.isAutoRead()) { - clearEpollIn0(); - } + epollInFinally(config); } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java index f32b89c6e4..d11aaf108a 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java @@ -15,26 +15,47 @@ */ package io.netty.channel.epoll; +import io.netty.channel.ChannelConfig; import io.netty.channel.RecvByteBufAllocator; -abstract class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle { - private final boolean isEdgeTriggered; +class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle { + private boolean isEdgeTriggered; + private final ChannelConfig config; private boolean receivedRdHup; - public EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) { + EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, ChannelConfig config) { super(handle); - this.isEdgeTriggered = isEdgeTriggered; + this.config = config; } - public final boolean isEdgeTriggered() { - return isEdgeTriggered; - } - - public final void receivedRdHup() { + final void receivedRdHup() { receivedRdHup = true; } - public final boolean isRdHup() { - return receivedRdHup; + boolean maybeMoreDataToRead() { + return isEdgeTriggered && lastBytesRead() > 0; + } + + final void edgeTriggered(boolean edgeTriggered) { + isEdgeTriggered = edgeTriggered; + } + + final boolean isEdgeTriggered() { + return isEdgeTriggered; + } + + @Override + public final boolean continueReading() { + /** + * EPOLL ET requires that we read until we get an EAGAIN + * (see Q9 in epoll man). However in order to + * respect auto read we supporting reading to stop if auto read is off. If auto read is on we force reading to + * continue to avoid a {@link java.lang.StackOverflowError} between channelReadComplete and reading from the + * channel. It is expected that the {@link #EpollSocketChannel} implementations will track if we are in + * edgeTriggered mode and all data was not read, and will force a EPOLLIN ready event. + * + * If EPOLLRDHUP has been received we must read until we get a read error. + */ + return receivedRdHup || maybeMoreDataToRead() && config.isAutoRead() || super.continueReading(); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorMessageHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorMessageHandle.java deleted file mode 100644 index bb8a59afa9..0000000000 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorMessageHandle.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.epoll; - -import io.netty.channel.RecvByteBufAllocator; - -/** - * Respects termination conditions for EPOLL message (aka packet) based protocols. - */ -final class EpollRecvByteAllocatorMessageHandle extends EpollRecvByteAllocatorHandle { - public EpollRecvByteAllocatorMessageHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) { - super(handle, isEdgeTriggered); - } - - @Override - public boolean continueReading() { - /** - * If edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise. For - * packet oriented descriptors must read until we get a EAGAIN - * (see Q9 in epoll man). - * - * If EPOLLRDHUP has been received we must read until we get a read error. - */ - return isEdgeTriggered() || isRdHup() ? true : super.continueReading(); - } -} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java index b17dd75a8a..9746e74248 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java @@ -15,27 +15,20 @@ */ package io.netty.channel.epoll; +import io.netty.channel.ChannelConfig; import io.netty.channel.RecvByteBufAllocator; -/** - * EPOLL must read until no more data is available while in edge triggered mode. This class will always continue reading - * unless the last read did not fill up the available buffer space. - */ final class EpollRecvByteAllocatorStreamingHandle extends EpollRecvByteAllocatorHandle { - public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) { - super(handle, isEdgeTriggered); + public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.Handle handle, ChannelConfig config) { + super(handle, config); } @Override - public boolean continueReading() { + boolean maybeMoreDataToRead() { /** - * if edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise. * For stream oriented descriptors we can assume we are done reading if the last read attempt didn't produce * a full buffer (see Q9 in epoll man). - * - * If EPOLLRDHUP has been received we must read until we get a read error. */ - return isRdHup() ? true : - isEdgeTriggered() ? lastBytesRead() == attemptedBytesRead() : super.continueReading(); + return isEdgeTriggered() && lastBytesRead() == attemptedBytesRead(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java index 109f388f07..56fa8acafc 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java @@ -17,14 +17,18 @@ package io.netty.channel.epoll; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; +import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.ServerChannel; import io.netty.util.ReferenceCountUtil; import org.junit.Assert; @@ -34,8 +38,10 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -128,6 +134,76 @@ public class EpollSocketChannelTest { } } + @Test + public void testAutoReadOffDuringReadOnlyReadsOneTime() throws InterruptedException { + EventLoopGroup group = new EpollEventLoopGroup(); + try { + runAutoReadTest(group, EpollServerSocketChannel.class, EpollSocketChannel.class, + new InetSocketAddress(0)); + runAutoReadTest(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class, + EpollSocketTestPermutation.newSocketAddress()); + } finally { + group.shutdownGracefully(); + } + } + + private void runAutoReadTest(EventLoopGroup group, Class serverChannelClass, + Class channelClass, SocketAddress bindAddr) throws InterruptedException { + Channel serverChannel = null; + Channel clientChannel = null; + try { + AutoReadInitializer serverInitializer = new AutoReadInitializer(); + AutoReadInitializer clientInitializer = new AutoReadInitializer(); + ServerBootstrap sb = new ServerBootstrap(); + sb.option(ChannelOption.SO_BACKLOG, 1024) + .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .option(ChannelOption.AUTO_READ, true) + .group(group) + .channel(serverChannelClass) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .childOption(ChannelOption.AUTO_READ, true) + // We want to ensure that we attempt multiple individual read operations per read loop so we can + // test the auto read feature being turned off when data is first read. + .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator()) + .childHandler(serverInitializer); + + serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel(); + + Bootstrap b = new Bootstrap() + .group(group) + .channel(channelClass) + .remoteAddress(serverChannel.localAddress()) + .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .option(ChannelOption.AUTO_READ, true) + // We want to ensure that we attempt multiple individual read operations per read loop so we can + // test the auto read feature being turned off when data is first read. + .option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator()) + .handler(clientInitializer); + clientChannel = b.connect().syncUninterruptibly().channel(); + + // 3 bytes means 3 independent reads for TestRecvByteBufAllocator + clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3])); + serverInitializer.autoReadHandler.assertSingleRead(); + + // 3 bytes means 3 independent reads for TestRecvByteBufAllocator + serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3])); + clientInitializer.autoReadHandler.assertSingleRead(); + + serverInitializer.channel.read(); + serverInitializer.autoReadHandler.assertSingleReadSecondTry(); + + clientChannel.read(); + clientInitializer.autoReadHandler.assertSingleReadSecondTry(); + } finally { + if (serverChannel != null) { + serverChannel.close().syncUninterruptibly(); + } + if (clientChannel != null) { + clientChannel.close().syncUninterruptibly(); + } + } + } + private void runExceptionHandleFeedbackLoop(EventLoopGroup group, Class serverChannelClass, Class channelClass, SocketAddress bindAddr) throws InterruptedException { Channel serverChannel = null; @@ -168,6 +244,77 @@ public class EpollSocketChannelTest { } } + /** + * Designed to keep reading as long as autoread is enabled. + */ + private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator { + @Override + public Handle newHandle() { + return new Handle() { + private ChannelConfig config; + private int attemptedBytesRead; + private int lastBytesRead; + @Override + public ByteBuf allocate(ByteBufAllocator alloc) { + return alloc.ioBuffer(guess()); + } + + @Override + public int guess() { + return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. + } + + @Override + public void reset(ChannelConfig config) { + this.config = config; + } + + @Override + public void incMessagesRead(int numMessages) { + } + + @Override + public void lastBytesRead(int bytes) { + lastBytesRead = bytes; + } + + @Override + public int lastBytesRead() { + return lastBytesRead; + } + + @Override + public void attemptedBytesRead(int bytes) { + attemptedBytesRead = bytes; + } + + @Override + public int attemptedBytesRead() { + return attemptedBytesRead; + } + + @Override + public boolean continueReading() { + return config.isAutoRead(); + } + + @Override + public void readComplete() { + } + }; + } + } + + private static class AutoReadInitializer extends ChannelInitializer { + final AutoReadHandler autoReadHandler = new AutoReadHandler(); + volatile Channel channel; + @Override + protected void initChannel(Channel ch) throws Exception { + channel = ch; + ch.pipeline().addLast(autoReadHandler); + } + } + private static class MyInitializer extends ChannelInitializer { final ExceptionHandler exceptionHandler = new ExceptionHandler(); @Override @@ -187,6 +334,36 @@ public class EpollSocketChannelTest { } } + private static final class AutoReadHandler extends ChannelInboundHandlerAdapter { + private final AtomicInteger count = new AtomicInteger(); + private final CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch2 = new CountDownLatch(2); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.release(msg); + if (count.incrementAndGet() == 1) { + ctx.channel().config().setAutoRead(false); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + latch.countDown(); + latch2.countDown(); + } + + void assertSingleRead() throws InterruptedException { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertEquals(1, count.get()); + } + + void assertSingleReadSecondTry() throws InterruptedException { + assertTrue(latch2.await(5, TimeUnit.SECONDS)); + assertEquals(2, count.get()); + } + } + private static class ExceptionHandler extends ChannelInboundHandlerAdapter { final AtomicLong count = new AtomicLong(); /**